Concurrent學習—Executor框架


              java.util.concurrent包分成了三個部分,分別是:
                                     java.util.concurrent
           
                                     java.util.concurrent.atomic
                                     java.util.concurrent.lock
              
          內容涵蓋了并發集合類、線程池機制、同步互斥機制、線程安全的變量更新工具類、鎖等等常用工具。 

           

          并發編程的一種編程方式是把任務拆分為一些列的小任務,即Runnable,然后在提交給一個Executor執行,Executor.execute(Runnalbe) Executor在執行時使用內部的線程池完成操作。

              
          例子:
                        
          有一個很大的整數數組,需要求這個數組中所有整數的和,來計算結果。  
                      
          JDK 7 中的 Fork/Join模式可以解決該問題,http://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/)

              
          分析:
                       
          采用多線程(任務),并且還要分割List,每一小塊的數組采用一個線程(任務)進行計算其和,那么我們必須要等待所有的線程(任務)完成之后才能得到正確的結果.

          步驟:

          • 分割數組,根據采用的線程(任務)數平均分配,即array.length/threadCounts。
          • 定義一個記錄“很大數組”中所有整數和的變量sum,采用一個線程(任務)處理一個分割后的子數組,計算子數組中所有整數和(subSum),然后把和(subSum)累加到sum上。
          • 等待所有線程(任務)完成后輸出總和(sum)的值。

           

          /**
           * 并行計算數組的和, 測試類
           * 
           * 
          @author lsb
           *
           
          */

          public class MainTest {
              
          public static void main(String[] args) {
                  
          int[] numbers = new int[] 123456781011 };
                  CalcArrayTotal calc 
          = new CalcArrayTotal();
                  Long sum 
          = calc.sum(numbers);
                  System.out.println(sum);
                  calc.close();
              }

          }


          主要實現類:

          import java.util.concurrent.CompletionService;
          import java.util.concurrent.ExecutionException;
          import java.util.concurrent.ExecutorCompletionService;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;

          import com.li.senbiao.Thread.concurrent.test1.SumCalculator;

          public class CalcArrayTotal {

              
          private ExecutorService exec;

              
          private CompletionService<Long> completionService;

              
          private int cpuCoreNumber;

              
          public CalcArrayTotal() {
                  cpuCoreNumber 
          = Runtime.getRuntime().availableProcessors();
                  exec 
          = Executors.newFixedThreadPool(cpuCoreNumber);
                  completionService 
          = new ExecutorCompletionService<Long>(exec);
              }


              
          public Long sum(final int[] numbers) {
                  
          // 根據CPU核心個數拆分任務,創建FutureTask并提交到Executor
                  for (int i = 0; i < cpuCoreNumber; i++{
                      
          int increment = numbers.length / cpuCoreNumber + 1;
                      
          int start = increment * i;
                      
          int end = increment * i + increment;
                      
          if (end > numbers.length) {
                          end 
          = numbers.length;
                      }

                      SumCalculator subCalc 
          = new SumCalculator(numbers, start, end);
                      
          if (!exec.isShutdown()) {
                          
          /**
                           * 生產者 submit() 執行的 任務。使用者 take() 已完成的任務, 
                           * 并按照完成這些任務的順序處理它們的結果  。
                           * 也就是調用CompletionService 的 take 方法是,
                           * 會返回按完成順序放回任務的結果, CompletionService 內部維護了一個 阻塞隊列 BlockingQueue ,
                           * 如果沒有任務完成, take() 方法也會阻塞。
                           
          */

                          completionService.submit(subCalc);
                      }

                  }

                  
          return getResult();
              }


              
          /**
               * 迭代每個只任務,獲得部分和,相加返回
               
          */

              
          public Long getResult() {
                  Long result 
          = 0L;
                  
          for (int i = 0; i < cpuCoreNumber; i++{
                      
          try {
                          Long subSum 
          = completionService.take().get();
                          result 
          += subSum;
                      }
           catch (InterruptedException e) {
                          e.printStackTrace();
                      }
           catch (ExecutionException e) {
                          e.printStackTrace();
                      }

                  }

                  
          return result;
              }


              
          public void close() {
                  exec.shutdown();
              }


          }


          一組的計算和:

          import java.util.concurrent.Callable;

          /**
           * 一組計算和值 
           * 
           * 
          @author lsb
           *
           
          */

          public class SumCalculator implements Callable<Long> {

              
          private int[] numbers;

              
          private int start;

              
          private int end;

              
          public SumCalculator(final int[] numbers, int start, int end) {
                  
          this.numbers = numbers;
                  
          this.start = start;
                  
          this.end = end;
              }


              @Override
              
          public Long call() throws Exception {
                  Long sum 
          = 0L;
                  
          for (int i = start; i < end; i++{
                      sum 
          += numbers[i];
                  }

                  
          return sum;
              }

          }






           


               一、Executors創建線程池

                  Executors類,提供了一系列工廠方法用于創先線程池,返回的線程池都實現了ExecutorService接口。

                   // 創建固定數目線程的線程池
                  public static ExecutorService newFixedThreadPool(int nThreads)

                 
                  // 創建一個可緩存的線程池,調用execute 將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。

                  public static ExecutorService newCachedThreadPool() 
              
                  // 創建一個單線程化的Executor

                  public static ExecutorService newSingleThreadExecutor()

                 
                 //  創建一個支持定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類

                  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)



             二、ExecutorService 與生命周期

                ExecutorService 擴展了Executor 并添加了一些生命周期管理的方法。一個Executor 的生命周期有三種狀態,運行 ,關閉 ,終止 。Executor 創建時處于運行狀態。當調用ExecutorService.shutdown() 后,處于關閉狀態,isShutdown() 方法返 回true 。這時,不應該再想Executor 中添加任務,所有已添加的任務執行完畢后,Executor 處于終止狀態,isTerminated() 返 回true 。

             如果Executor 處于關閉狀態,往Executor 提交任務會拋出unchecked exception RejectedExecutionException 。

           

           

              三、使用Callable ,Future 返回結果

               Future<V> 代表一個異步執行的操作,通過get() 方法可以獲得操作的結果,如果異步操作還沒有完成,則,get() 會使當前 線程阻塞。FutureTask<V> 實現了Future<V> 和Runable<V> 。Callable 代表一個 有返回值得操作。

          ExecutoreService 提供了submit() 方法,傳遞一個Callable ,或Runnable ,返回Future 。如果Executor 后臺線程池還沒有完成Callable 的計算,這調用返回Future 對象的get() 方法,會阻塞直到計算完成。





              

          posted on 2011-10-12 17:25 胡鵬 閱讀(2444) 評論(0)  編輯  收藏 所屬分類: java基礎

          導航

          <2011年10月>
          2526272829301
          2345678
          9101112131415
          16171819202122
          23242526272829
          303112345

          統計

          常用鏈接

          留言簿(3)

          隨筆分類

          隨筆檔案

          agile

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 吉隆县| 海安县| 枝江市| 江华| 泰州市| 三都| 剑川县| 定西市| 介休市| 特克斯县| 沙湾县| 乌鲁木齐市| 深水埗区| 南丹县| 沛县| 奎屯市| 桃源县| 长春市| 宝应县| 安岳县| 浦东新区| 时尚| 监利县| 江口县| 津南区| 永嘉县| 雷州市| 桑日县| 军事| 肇州县| 泸州市| 赫章县| 玛曲县| 昂仁县| 陇南市| 博罗县| 修武县| 新丰县| 新干县| 凤台县| 民丰县|