最近的工作需要統計一些復雜的報表,為了提高效率,想用多線程去實現,但要在所有線程完成統計任務后,將結果匯總。所以在思考有沒有什么辦法解決,之所以是“系列一”是因為我想記錄下我的思考過程。
1、首先設計一個Executer,負責任務的執行和匯總:
Java代碼
- public class Executer {
- //計算已經派發的任務數(條件謂詞)
- public static int THREAD_COUNT = 0;
- //線程池
- private Executor pool = null;
- public Executer() {
- this(1);
- }
- public Executer(int threadPoolSize) {
- pool = Executors.newFixedThreadPool(threadPoolSize);
- }
- /**
- * 任務派發
- * @param job
- */
- public void fork(Job job){
- //將任務派發給線程池去執行
- pool.execute(job);
- THREAD_COUNT++;
- }
- /**
- * 統計任務結果
- */
- public void join(){
- while(THREAD_COUNT > 0){
- System.out.println("threadCount: "+THREAD_COUNT);
- try {
- wait();//如果任務沒有全部完成,則掛起
- } catch (Exception e) {}//這里總是拋異常,不知道為什么,好吧!先不管它
- }
- }
- }
2、寫一個抽象的Job類,負責執行具體的任務
Java代碼
- public abstract class Job implements Runnable {
- @Override
- public void run() {
- this.execute();//執行子類具體任務
- Executer.THREAD_COUNT--;
- try{
- notifyAll();//這里總是拋異常,不知道為什么,好吧!先不管它
- }catch(Exception e){}
- }
- /**
- * 業務處理函數
- */
- public abstract void execute();
- }
3、測試,先來一個具體的任務實現。
Java代碼
- public class MyJob extends Job {
- @Override
- public void execute() {
- //模擬業務需要處理1秒.
- try {Thread.sleep(1000);} catch (InterruptedException e) {}
- System.out.println("running thread id = "+Thread.currentThread().getId());
- }
- }
4、測試。
Java代碼
- public class Test {
- public static void main(String[] args) {
- //初始化任務池
- Executer exe = new Executer(5);
- //初始化任務
- long time = System.currentTimeMillis();
- for (int i = 0; i < 10; i++) {
- MyJob job = new MyJob();
- exe.fork(job);//派發任務
- }
- //匯總任務結果
- exe.join();
- System.out.println("time: "+(System.currentTimeMillis() - time));
- }
- }
5、好吧,看一下結果
Java代碼
- threadCount: 10
- ......(表示有N多個)
- threadCount: 10
- running thread id = 8
- running thread id = 9
- running thread id = 11
- running thread id = 10
- running thread id = 12
- threadCount: 5
- ......(表示有N多個)
- threadCount: 5
- running thread id = 9
- running thread id = 10
- running thread id = 12
- running thread id = 8
- running thread id = 11
- threadCount: 3
- time: 2032
哈哈,看來是可以了,最后匯總任務的處理時間是2032毫秒,看來是比單個任務順序執行來的快。但是有幾個問題:
1)如果沒有catch那個超級Exception的話,就會拋下面的異常:
Java代碼
- java.lang.IllegalMonitorStateException
- at java.lang.Object.wait(Native Method)
- at java.lang.Object.wait(Object.java:485)
- at com.one.Executer.join(Executer.java:38)
- at com.test.Test.main(Test.java:21)
2)為啥會打印N多個同樣值threadCount呢?
于是和同事(河東)溝通,他說wait要放在synchronized里面才行,好吧,試一下,改進一下Executer和Job
Java代碼
- public class Executer {
- //計算已經派發的任務數(條件謂詞)
- public static int THREAD_COUNT = 0;
- //條件隊列鎖
- public static final Object LOCK = new Object();
- //線程池
- private Executor pool = null;
- public Executer() {
- this(1);
- }
- public Executer(int threadPoolSize) {
- pool = Executors.newFixedThreadPool(threadPoolSize);
- }
- /**
- * 任務派發
- * @param job
- */
- public void fork(Job job){
- //將任務派發給線程池去執行
- pool.execute(job);
- //增加線程數
- synchronized (LOCK) {
- THREAD_COUNT++;
- }
- }
- /**
- * 統計任務結果
- */
- public void join(){
- synchronized (LOCK) {
- while(THREAD_COUNT > 0){
- System.out.println("threadCount: "+THREAD_COUNT);
- try {
- LOCK.wait();//如果任務沒有全部完成,則掛起
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
Java代碼
- public abstract class Job implements Runnable {
- @Override
- public void run() {
- this.execute();//執行子類具體任務
- synchronized (Executer.LOCK) {
- //處理完業務后,任務結束,遞減線程數,同時喚醒主線程
- Executer.THREAD_COUNT--;
- Executer.LOCK.notifyAll();
- }
- }
- /**
- * 業務處理函數
- */
- public abstract void execute();
- }
6、測試一下:
Java代碼
- threadCount: 10
- running thread id = 8
- running thread id = 11
- running thread id = 9
- threadCount: 7
- running thread id = 10
- threadCount: 6
- running thread id = 12
- threadCount: 5
- running thread id = 11
- running thread id = 12
- running thread id = 10
- threadCount: 2
- running thread id = 9
- running thread id = 8
- threadCount: 1
- time: 2016
還真的行,謝謝河東哈!
但是原因是什么呢?回去查了查書《Java并發編程實踐》,見附件!
Java代碼
- 第14.2.1節這樣說:
- 在條件等待中存在一種重要的三元關系,包括加鎖、wait方法和一個條件謂詞。在條件謂詞中包含多個變量,而狀態變量由一個鎖來保護,因此在測試條件謂詞之前必須先持有這個鎖。鎖對象與條件隊列對象(即調用wait和notify等方法所在的對象)必須是同一個對象。
- ...
- 由于線程在條件謂詞不為真的情況下也可以反復地醒來,因此必須在一個循環中調用wait,并在每次迭代中都測試條件謂詞。
- 14.2.4節:
- 由于在調用notify或notifyAll時必須持有條件隊列對象的鎖,而如果這些等待中線程此時不能重新獲得鎖,那么無法從wait返回,因此發出通知的線程應該盡快地釋放,從而確保正在等待的線程盡可能盡快的解除阻塞。
看來之前是不會用wait和notify,哈哈~!
感謝河東,和你交流收獲很大!
順便測試一下java多線程情況下,多核CPU的利用率,修改上面的線程池大小和任務數(2個線程處理1000000個任務,去掉MyJob的sleep(這樣可以多搶些CPU時間),結果如下:
看來window下是可以利用多核的,雖然是一個JVM進程。之前和斯亮討論的結論是錯誤的。