posts - 101,  comments - 29,  trackbacks - 0

          最近的工作需要統計一些復雜的報表,為了提高效率,想用多線程去實現,但要在所有線程完成統計任務后,將結果匯總。所以在思考有沒有什么辦法解決,之所以是“系列一”是因為我想記錄下我的思考過程。

          1、首先設計一個Executer,負責任務的執行和匯總:

          Java代碼  收藏代碼
          1. public class Executer {  
          2.     //計算已經派發的任務數(條件謂詞)  
          3.     public static int THREAD_COUNT = 0;  
          4.     //線程池  
          5.     private Executor pool = null;  
          6.     public Executer() {  
          7.         this(1);  
          8.     }  
          9.     public Executer(int threadPoolSize) {  
          10.         pool = Executors.newFixedThreadPool(threadPoolSize);  
          11.     }  
          12.     /** 
          13.      * 任務派發 
          14.      * @param job 
          15.      */  
          16.     public void fork(Job job){  
          17.         //將任務派發給線程池去執行  
          18.         pool.execute(job);  
          19.         THREAD_COUNT++;  
          20.     }  
          21.     /** 
          22.      * 統計任務結果 
          23.      */  
          24.     public void join(){  
          25.         while(THREAD_COUNT > 0){  
          26.             System.out.println("threadCount: "+THREAD_COUNT);  
          27.             try {  
          28.                 wait();//如果任務沒有全部完成,則掛起  
          29.             } catch (Exception e) {}//這里總是拋異常,不知道為什么,好吧!先不管它  
          30.         }  
          31.     }  
          32. }  

           2、寫一個抽象的Job類,負責執行具體的任務

          Java代碼  收藏代碼
          1. public abstract class Job implements Runnable {  
          2.   
          3.     @Override  
          4.     public void run() {  
          5.         this.execute();//執行子類具體任務  
          6.         Executer.THREAD_COUNT--;  
          7.         try{  
          8.             notifyAll();//這里總是拋異常,不知道為什么,好吧!先不管它  
          9.         }catch(Exception e){}  
          10.     }  
          11.     /** 
          12.      * 業務處理函數 
          13.      */  
          14.     public abstract void execute();  
          15.   
          16. }  

           

          3、測試,先來一個具體的任務實現。

          Java代碼  收藏代碼
          1. public class MyJob extends Job {  
          2.   
          3.     @Override  
          4.     public void execute() {  
          5.         //模擬業務需要處理1秒.  
          6.         try {Thread.sleep(1000);} catch (InterruptedException e) {}  
          7.         System.out.println("running thread id = "+Thread.currentThread().getId());  
          8.     }  
          9.   
          10. }  

           

          4、測試。

          Java代碼  收藏代碼
          1. public class Test {  
          2.     public static void main(String[] args) {  
          3.         //初始化任務池  
          4.         Executer exe = new Executer(5);  
          5.         //初始化任務  
          6.         long time = System.currentTimeMillis();  
          7.         for (int i = 0; i < 10; i++) {  
          8.             MyJob job = new MyJob();  
          9.             exe.fork(job);//派發任務  
          10.         }  
          11.         //匯總任務結果  
          12.         exe.join();  
          13.         System.out.println("time: "+(System.currentTimeMillis() - time));  
          14.     }  
          15.   
          16. }  

           

           5、好吧,看一下結果

           

          Java代碼  收藏代碼
          1. threadCount: 10  
          2. ......(表示有N多個)  
          3. threadCount: 10  
          4. running thread id = 8  
          5. running thread id = 9  
          6. running thread id = 11  
          7. running thread id = 10  
          8. running thread id = 12  
          9. threadCount: 5  
          10. ......(表示有N多個)  
          11. threadCount: 5  
          12. running thread id = 9  
          13. running thread id = 10  
          14. running thread id = 12  
          15. running thread id = 8  
          16. running thread id = 11  
          17. threadCount: 3  
          18. time: 2032  

           哈哈,看來是可以了,最后匯總任務的處理時間是2032毫秒,看來是比單個任務順序執行來的快。但是有幾個問題:

          1)如果沒有catch那個超級Exception的話,就會拋下面的異常:

          Java代碼  收藏代碼
          1. java.lang.IllegalMonitorStateException  
          2.     at java.lang.Object.wait(Native Method)  
          3.     at java.lang.Object.wait(Object.java:485)  
          4.     at com.one.Executer.join(Executer.java:38)  
          5.     at com.test.Test.main(Test.java:21)  

           

          2)為啥會打印N多個同樣值threadCount呢?

          于是和同事(河東)溝通,他說wait要放在synchronized里面才行,好吧,試一下,改進一下Executer和Job

           

          Java代碼  收藏代碼
          1. public class Executer {  
          2.     //計算已經派發的任務數(條件謂詞)  
          3.     public static int THREAD_COUNT = 0;  
          4.     //條件隊列鎖  
          5.     public static final Object LOCK = new Object();  
          6.     //線程池  
          7.     private Executor pool = null;  
          8.     public Executer() {  
          9.         this(1);  
          10.     }  
          11.     public Executer(int threadPoolSize) {  
          12.         pool = Executors.newFixedThreadPool(threadPoolSize);  
          13.     }  
          14.     /** 
          15.      * 任務派發 
          16.      * @param job 
          17.      */  
          18.     public void fork(Job job){  
          19.         //將任務派發給線程池去執行  
          20.         pool.execute(job);  
          21.         //增加線程數  
          22.         synchronized (LOCK) {  
          23.             THREAD_COUNT++;  
          24.         }  
          25.     }  
          26.     /** 
          27.      * 統計任務結果 
          28.      */  
          29.     public void join(){  
          30.         synchronized (LOCK) {  
          31.             while(THREAD_COUNT > 0){  
          32.                 System.out.println("threadCount: "+THREAD_COUNT);  
          33.                 try {  
          34.                     LOCK.wait();//如果任務沒有全部完成,則掛起  
          35.                 } catch (InterruptedException e) {  
          36.                     e.printStackTrace();  
          37.                 }  
          38.             }  
          39.         }  
          40.     }  
          41. }  

           

          Java代碼  收藏代碼
          1. public abstract class Job implements Runnable {  
          2.   
          3.     @Override  
          4.     public void run() {  
          5.         this.execute();//執行子類具體任務  
          6.         synchronized (Executer.LOCK) {  
          7.             //處理完業務后,任務結束,遞減線程數,同時喚醒主線程  
          8.             Executer.THREAD_COUNT--;  
          9.             Executer.LOCK.notifyAll();  
          10.         }  
          11.     }  
          12.     /** 
          13.      * 業務處理函數 
          14.      */  
          15.     public abstract void execute();  
          16.   
          17. }  

           6、測試一下:

          Java代碼  收藏代碼
          1. threadCount: 10  
          2. running thread id = 8  
          3. running thread id = 11  
          4. running thread id = 9  
          5. threadCount: 7  
          6. running thread id = 10  
          7. threadCount: 6  
          8. running thread id = 12  
          9. threadCount: 5  
          10. running thread id = 11  
          11. running thread id = 12  
          12. running thread id = 10  
          13. threadCount: 2  
          14. running thread id = 9  
          15. running thread id = 8  
          16. threadCount: 1  
          17. time: 2016  

           還真的行,謝謝河東哈!

          但是原因是什么呢?回去查了查書《Java并發編程實踐》,見附件!

          Java代碼  收藏代碼
          1. 14.2.1節這樣說:  
          2.   
          3. 在條件等待中存在一種重要的三元關系,包括加鎖、wait方法和一個條件謂詞。在條件謂詞中包含多個變量,而狀態變量由一個鎖來保護,因此在測試條件謂詞之前必須先持有這個鎖。鎖對象與條件隊列對象(即調用wait和notify等方法所在的對象)必須是同一個對象。  
          4.   
          5. ...  
          6.   
          7. 由于線程在條件謂詞不為真的情況下也可以反復地醒來,因此必須在一個循環中調用wait,并在每次迭代中都測試條件謂詞。  
          8.   
          9. 14.2.4節:  
          10.   
          11. 由于在調用notify或notifyAll時必須持有條件隊列對象的鎖,而如果這些等待中線程此時不能重新獲得鎖,那么無法從wait返回,因此發出通知的線程應該盡快地釋放,從而確保正在等待的線程盡可能盡快的解除阻塞。  

           

          看來之前是不會用wait和notify,哈哈~!

           

          感謝河東,和你交流收獲很大!

           

          順便測試一下java多線程情況下,多核CPU的利用率,修改上面的線程池大小和任務數(2個線程處理1000000個任務,去掉MyJob的sleep(這樣可以多搶些CPU時間),結果如下:

           

          看來window下是可以利用多核的,雖然是一個JVM進程。之前和斯亮討論的結論是錯誤的。

          posted on 2012-07-15 01:20 mixer-a 閱讀(3785) 評論(2)  編輯  收藏

          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 东港市| 马龙县| 广汉市| 通海县| 红原县| 安岳县| 永安市| 基隆市| 原阳县| 双辽市| 常德市| 东光县| 社旗县| 河津市| 北宁市| 连云港市| 德江县| 班戈县| 密山市| 平湖市| 扎兰屯市| 湖口县| 稷山县| 盐池县| 元朗区| 夹江县| 新田县| 丹江口市| 新巴尔虎右旗| 阿图什市| 清苑县| 伊宁县| 莱阳市| 托克逊县| 利津县| 琼结县| 江都市| 慈利县| 湟中县| 怀柔区| 河南省|