John Jiang

          a cup of Java, cheers!
          https://github.com/johnshajiang/blog

             :: 首頁 ::  :: 聯(lián)系 :: 聚合  :: 管理 ::
            131 隨筆 :: 1 文章 :: 530 評論 :: 0 Trackbacks
          <2013年10月>
          293012345
          6789101112
          13141516171819
          20212223242526
          272829303112
          3456789

          留言簿(3)

          隨筆分類(415)

          隨筆檔案(130)

          文章分類

          Attach

          搜索

          積分與排名

          最新評論

          閱讀排行榜

          評論排行榜

          Java并發(fā)基礎(chǔ)實踐--分而治之
          本系列的第三篇文章將以實現(xiàn)一個極簡單的查找最大數(shù)的任務(wù)為例,分別給出了四個版本:1.順序執(zhí)行;2.基于傳統(tǒng)的Thread.join();3.基于并發(fā)工具包的Future;4.基于JDK 7引入的Fork/Join框架。(2013.10.25最后更新)

              分而治之(Divide-and-Conquer)是解決復(fù)雜問題的常用方法。在并發(fā)應(yīng)用中,可將一個復(fù)雜算法分解為若干子問題,然后使用獨(dú)立的線程去執(zhí)行解決子問題的程序,之后再將各個子問題的結(jié)果逐級進(jìn)行合并以得到最終結(jié)果。在特定環(huán)境中,這種方式可能較好地提高程序的執(zhí)行效率。

          方案1:順序執(zhí)行
              找出對給定整形數(shù)組中的最大值,使用的方法很簡單,就是逐一遍歷每個元素,將當(dāng)前元素與當(dāng)前最大值進(jìn)行比較,若當(dāng)前元素的值更大,則將該值作為新的當(dāng)前最大值,再去與下一個元素進(jìn)行比較,如此反復(fù)。在編寫并發(fā)程序來實現(xiàn)這個算法之前,本文將先給出一個順序執(zhí)行的實現(xiàn)版本,但依然利用了分而治之的思想。即,先將給定數(shù)列分割成若干較小的子數(shù)列,找出各個子數(shù)列的最大值,將這些最大值組成一個新的數(shù)列,然后再使用同樣的方法對這個新數(shù)列進(jìn)行分割與最大值合并,...依次類推,直至找到最大值。
              代碼清單1中的MaxNumberFinder是本文的基礎(chǔ)類:1.getMaxNumber()方法展示了如何查找一個數(shù)列中的最大值;2.使用工具方法getNumberArray()/createNumberArray()可以創(chuàng)建指定長度的隨機(jī)整數(shù)數(shù)列;3.方法findMaxNumber()展示了將一個數(shù)列分割為子數(shù)列(子數(shù)列的最大長度不超過指定值THRESHOLD),并查找子數(shù)列的最大值,以及將子數(shù)列的最大值組成各級新的中間數(shù)列去查找其最大值。
          清單1
          public class MaxNumberFinder {

              
          public static final int THRESHOLD = 50;
              
          private static final Random random = new Random();

              
          public static int[] getNumberArray() {
                  
          return createNumberArray(3000);
              }

              
          private static int[] createNumberArray(int capacity) {
                  
          int[] numbers = new int[capacity];
                  
          for (int i = 0; i < numbers.length; i++) {
                      numbers[i] 
          = random.nextInt(capacity);
                  }

                  
          return numbers;
              }

              
          public static int getMaxNumber(int[] numbers) {
                  
          if (numbers.length == 0) {
                      
          return Integer.MIN_VALUE;
                  }

                  
          int max = numbers[0];
                  
          for (int i = 1; i < numbers.length; i++) {
                      
          if (numbers[i] > max) {
                          max 
          = numbers[i];
                      }
                  }

                  
          return max;
              }

              
          private static int findMaxNumber(int[] numbers) {
                  
          // interim max number array
                  int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];

                  
          for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
                      
          final int[] subNumbers = new int[THRESHOLD];
                      System.arraycopy(numbers, i, subNumbers, 
          0, subNumbers.length);
                      maxNumbers[i 
          / THRESHOLD] = getMaxNumber(subNumbers);
                  }

                  
          if (numbers.length % THRESHOLD != 0) {
                      
          int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
                      System.arraycopy(numbers, numbers.length 
          - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
                      maxNumbers[maxNumbers.length 
          - 1= getMaxNumber(lastSubNumbers);
                  }

                  
          // if the length of interim max number array is greater than threshold,
                  
          // it must divide-and-search recursively.
                  if (maxNumbers.length > THRESHOLD) {
                      
          return findMaxNumber(maxNumbers);
                  } 
          else {
                      
          return getMaxNumber(maxNumbers);
                  }
              }
          }

          方案2:基于Thread.join()
              基于方法MaxNumberFinder.findMaxNumber()的實現(xiàn),在每次分割之后得到的數(shù)列,以及合并得到的中間最大值的數(shù)列,都可以使用獨(dú)立的線程去分別查找它們的最大值,如代碼清單2所示。
          清單2
          public class MaxNumberFinderOnThread {

              
          private static int findMaxNumber(int[] numbers) {
                  
          // interim max number array
                  final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];

                  
          // the threads for searching max value in sub number array
                  List<Thread> threads = new ArrayList<Thread>();

                  
          for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
                      
          final int[] subNumbers = new int[THRESHOLD];
                      System.arraycopy(numbers, i, subNumbers, 
          0, subNumbers.length);
                      
          final int bufIndex = i / THRESHOLD;

                      Thread bufThread 
          = new Thread(new Runnable() {

                          @Override
                          
          public void run() {
                              maxNumbers[bufIndex] 
          = MaxNumberFinder.getMaxNumber(subNumbers);
                          }
                      });
                      bufThread.start();
                      threads.add(bufThread);
                  }

                  
          if (numbers.length % THRESHOLD != 0) {
                      
          final int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
                      System.arraycopy(numbers, numbers.length 
          - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);

                      
          final int lastIndex = (numbers.length - 1/ THRESHOLD;
                      Thread lastThread 
          = new Thread(new Runnable() {

                          @Override
                          
          public void run() {
                              maxNumbers[lastIndex] 
          = MaxNumberFinder.getMaxNumber(lastSubNumbers);
                          }
                      });
                      threads.add(lastThread);
                      lastThread.start();
                  }

                  
          // waiting for all of jobs are finished
                  for (int i = 0, size = threads.size(); i < size; i++) {
                      
          try {
                          threads.get(i).join();
                      } 
          catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }

                  
          // if the length of interim max number array is greater than threshold,
                  
          // it must divide-and-search recursively.
                  if (maxNumbers.length > THRESHOLD) {
                      
          return findMaxNumber(maxNumbers);
                  } 
          else {
                      
          return MaxNumberFinder.getMaxNumber(maxNumbers);
                  }
              }
          }
          為了能夠?qū)⑼患壘€程所找的最大值合并成一個新的中間最大值數(shù)列,必須要等待這一組線程全部執(zhí)行完畢。而通過分別調(diào)用每個線程實例中join()方法即可滿足這一要求。
              必須注意的是,若數(shù)列的很長,而每次最多處理的數(shù)列較短(即,THRESHOLD值較小),該方案將會產(chǎn)生較多的線程,消耗大量內(nèi)存。另外,還會有較多的高層線程在等待低層線程的執(zhí)行結(jié)果,這可能會大大影響整個任務(wù)的執(zhí)行效率。

          方案3:基于Future
              方案2使用的是舊有API,根據(jù)本系列上一篇中所提及的并發(fā)工具包中的Future,同樣可以實現(xiàn)這一功能。只需要將使用Thread/Runnable的地方,相應(yīng)地替換成使用Future/Callable即可,如代碼清單3所示。
          清單3
          public class MaxNumberFinderOnFuture {

              
          private static ExecutorService executor = Executors.newCachedThreadPool();

              
          private static int findMaxNumber(int[] numbers) {
                  
          final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
                  List
          <Future<Integer>> futures = new ArrayList<Future<Integer>>();

                  
          for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
                      
          final int[] subNumbers = new int[THRESHOLD];
                      System.arraycopy(numbers, i, subNumbers, 
          0, subNumbers.length);

                      Future
          <Integer> bufFuture = executor.submit(new Callable<Integer>() {

                          @Override
                          
          public Integer call() throws Exception {
                              
          return MaxNumberFinder.getMaxNumber(subNumbers);
                          }
                      });
                      futures.add(bufFuture);
                  }

                  
          if (numbers.length % THRESHOLD != 0) {
                      
          final int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
                      System.arraycopy(numbers, numbers.length 
          - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);

                      Future
          <Integer> lastFuture = executor.submit(new Callable<Integer>() {

                          @Override
                          
          public Integer call() throws Exception {
                              
          return MaxNumberFinder.getMaxNumber(lastSubNumbers);
                          }
                      });

                      futures.add(lastFuture);
                  }

                  
          // retrieve results from Futures one by one,
                  
          // get() method will be blocked if the searching isn't finished
                  for (int i = 0, size = futures.size(); i < size; i++) {
                      
          try {
                          maxNumbers[i] 
          = futures.get(i).get();
                      } 
          catch (InterruptedException e) {
                          e.printStackTrace();
                      } 
          catch (ExecutionException e) {
                          e.printStackTrace();
                      }
                  }

                  
          // if the length of interim max number array is greater than threshold,
                  
          // it must divide-and-search recursively.
                  if (maxNumbers.length > THRESHOLD) {
                      
          return findMaxNumber(maxNumbers);
                  } 
          else {
                      
          return MaxNumberFinder.getMaxNumber(maxNumbers);
                  }
              }
          }
              這一版的findMaxNumber()實現(xiàn)中,沒有顯示地調(diào)用Thread.join()方法去等待線程的執(zhí)行結(jié)果。但在任務(wù)執(zhí)行完成之前,調(diào)用Future.get()時會被阻塞,在此處的效果就與Thread.join()相同。

          方案4:基于Fork/Join
              方案2與方案3不一定能夠提高執(zhí)行效率。如果該應(yīng)用程序運(yùn)行在單核處理器上,或者它沒有利用上多核處理器中的多個內(nèi)核,那么它的執(zhí)行時間很有可能要長于方案1所使用的順序執(zhí)行方案,因為線程的創(chuàng)建、調(diào)度、上下文切換都會產(chǎn)生額外的開銷。
              為了更好地適應(yīng)已經(jīng)十分普遍的多核處理器場景,JDK 7引入了Fork/Join框架。如代碼清單4所示,基于該框架提供的RecursiveTask,我們就可以直接地對任務(wù)進(jìn)行分割與合并,程序本身也更為清晰簡潔。
          清單4
          public class NumberFinderOnForkJoin extends RecursiveTask<Integer> {

              
          private static final long serialVersionUID = -5871813408961649666L;

              
          private int[] numbers = null;
              
          private int maxNumber = Integer.MIN_VALUE;

              
          public NumberFinderOnForkJoin(int[] numbers) {
              
              this.numbers = numbers;
              }

              @Override
              
          public Integer compute() {
              
              if (numbers.length <= THRESHOLD) {
               
                 maxNumber = NumberFinder.getMaxNumber(numbers);
             
              else {
             
                   int[] leftNumbers = new int[numbers.length / 2];
                 
               System.arraycopy(numbers, 0, leftNumbers, 0, leftNumbers.length);
                  
              int[] rightNumbers = new int[numbers.length - numbers.length / 2];
                      System.arraycopy(numbers, leftNumbers.length, rightNumbers, 0, rightNumbers.length);

              
                  // divide the task into two sub-tasks.
                      NumberFinderOnForkJoin leftTask = new NumberFinderOnForkJoin(leftNumbers);
              
                  NumberFinderOnForkJoin rightTask = new NumberFinderOnForkJoin(rightNumbers);
                  
              invokeAll(leftTask, rightTask);
                 
               maxNumber = Math.max(leftTask.maxNumber, rightTask.maxNumber);
              
              }

              
              return maxNumber;
              }

              
          public static void main(String[] args) throws InterruptedException, ExecutionException {
                  NumberFinderOnForkJoin task = new NumberFinderOnForkJoin(NumberFinder.getNumberArray());
              
              ForkJoinPool pool = new ForkJoinPool();
                  pool.submit(task);
                  System.out.println(task.get());
              
              pool.shutdown();
              }
          }

              稍稍說明一下,方案4對數(shù)列進(jìn)行分割的方法與前三個方案都不同。方案4是使用的二分法,而前三個方案都是按從前往后的順序依次取出不長于THRESHOLD的子數(shù)列。但這兩種方法沒有本質(zhì)上的區(qū)別,最多只是分割的次數(shù)略有不同罷了。例如,一個長度為9的數(shù)列,將THRESHOLD的值設(shè)為3。若用二分法,第一輪會分割出4個子數(shù)列,其長度分別為3,2,3和1,后面還需要進(jìn)行一輪分割;但若用順序法,分割將得到的3個子數(shù)列,其長度均為3,之后不需要再進(jìn)行分割了。

          小結(jié)
              分解任務(wù),各個擊破,是應(yīng)對復(fù)雜問題的慣用伎倆。在資源充足的情況下,應(yīng)該盡可能地利用空閑的計算資源。Java并發(fā)工具包提供了適應(yīng)多核環(huán)境的運(yùn)行框架,使應(yīng)用程序能更高效地利用多核處理器。
              但對執(zhí)行方案的選定,包括THRESHOLD的值,依然要基于性能測試。對于本文的例子,在我的測試環(huán)境中,方案1其實是最高的。在并發(fā)執(zhí)行方案中,方案2會明顯慢于方案3和方案4,而方案3與方案4之間則難分伯仲。

          posted on 2013-10-23 23:27 John Jiang 閱讀(4267) 評論(0)  編輯  收藏 所屬分類: JavaSE 、Java 、Concurrency 、原創(chuàng)Java并發(fā)基礎(chǔ)實踐
          主站蜘蛛池模板: 旌德县| 长葛市| 连平县| 潜江市| 贵州省| 万年县| 海兴县| 定陶县| 玛曲县| 黑山县| 西乌珠穆沁旗| 安塞县| 西藏| 石景山区| 三明市| 新余市| 靖宇县| 满洲里市| 平乡县| 新巴尔虎左旗| 靖江市| 崇文区| 靖边县| 隆安县| 兖州市| 惠安县| 尚义县| 廉江市| 灌南县| 天水市| 邛崃市| 英超| 南安市| 皋兰县| 墨江| 景德镇市| 攀枝花市| 武功县| 循化| 屏边| 翁牛特旗|