xylz,imxylz

          關注后端架構、中間件、分布式和并發編程

             :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            111 隨筆 :: 10 文章 :: 2680 評論 :: 0 Trackbacks

          [本文地址:http://www.aygfsteel.com/Files/xylz/Inside.Java.Concurrency_35.ThreadPool.part8_Future.ScheduledThreadPoolExecutor.pdf]

           

          線程池任務執行結果

          這一節來探討下線程池中任務執行的結果以及如何阻塞線程、取消任務等等。

          1 package info.imxylz.study.concurrency.future;
          2 
          3 public class SleepForResultDemo implements Runnable {
          4 
          5     static boolean result = false;
          6 
          7     static void sleepWhile(long ms) {
          8         try {
          9             Thread.sleep(ms);
          10         } catch (Exception e) {}
          11     }
          12 
          13     @Override
          14     public void run() {
          15         //do work
          16         System.out.println("Hello, sleep a while.");
          17         sleepWhile(2000L);
          18         result = true;
          19     }
          20 
          21     public static void main(String[] args) {
          22         SleepForResultDemo demo = new SleepForResultDemo();
          23         Thread t = new Thread(demo);
          24         t.start();
          25         sleepWhile(3000L);
          26         System.out.println(result);
          27     }
          28 
          29 }
          30 

          在沒有線程池的時代里面,使用Thread.sleep(long)去獲取線程執行完畢的場景很多。顯然這種方式很笨拙,他需要你事先知道任務可能的執行時間,并且還會阻塞主線程,不管任務有沒有執行完畢。

          1 package info.imxylz.study.concurrency.future;
          2 
          3 public class SleepLoopForResultDemo implements Runnable {
          4 
          5     boolean result = false;
          6 
          7     volatile boolean finished = false;
          8 
          9     static void sleepWhile(long ms) {
          10         try {
          11             Thread.sleep(ms);
          12         } catch (Exception e) {}
          13     }
          14 
          15     @Override
          16     public void run() {
          17         //do work
          18         try {
          19             System.out.println("Hello, sleep a while.");
          20             sleepWhile(2000L);
          21             result = true;
          22         } finally {
          23             finished = true;
          24         }
          25     }
          26 
          27     public static void main(String[] args) {
          28         SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
          29         Thread t = new Thread(demo);
          30         t.start();
          31         while (!demo.finished) {
          32             sleepWhile(10L);
          33         }
          34         System.out.println(demo.result);
          35     }
          36 
          37 }
          38 

          使用volatile與while死循環的好處就是等待的時間可以稍微小一點,但是依然有CPU負載高并且阻塞主線程的問題。最簡單的降低CPU負載的方式就是使用Thread.join().

                  SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
                  Thread t
          = new Thread(demo);
                  t.start();
                  t.join();
                  System.out.println(demo.result);

          顯然這也是一種不錯的方式,另外還有自己寫鎖使用wait/notify的方式。其實join()從本質上講就是利用while和wait來實現的。

          上面的方式中都存在一個問題,那就是會阻塞主線程并且任務不能被取消。為了解決這個問題,線程池中提供了一個Future接口。

          ThreadPoolExecutor-Future

          在Future接口中提供了5個方法。

          • V get() throws InterruptedException, ExecutionException: 等待計算完成,然后獲取其結果。
          • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。
          • boolean cancel(boolean mayInterruptIfRunning):試圖取消對此任務的執行。
          • boolean isCancelled():如果在任務正常完成前將其取消,則返回 true
          • boolean isDone():如果任務已完成,則返回 true。 可能由于正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回 true

          API看起來容易,來研究下異常吧。get()請求獲取一個結果會阻塞當前進程,并且可能拋出以下三種異常:

          • InterruptedException:執行任務的線程被中斷則會拋出此異常,此時不能知道任務是否執行完畢,因此其結果是無用的,必須處理此異常。
          • ExecutionException:任務執行過程中(Runnable#run())方法可能拋出RuntimeException,如果提交的是一個java.util.concurrent.Callable<V>接口任務,那么java.util.concurrent.Callable.call()方法有可能拋出任意異常。
          • CancellationException:實際上get()方法還可能拋出一個CancellationException的RuntimeException,也就是任務被取消了但是依然去獲取結果。

          對于get(long timeout, TimeUnit unit)而言,除了get()方法的異常外,由于有超時機制,因此還可能得到一個TimeoutException。

          boolean cancel(boolean mayInterruptIfRunning)方法比較復雜,各種情況比較多:

          1. 如果任務已經執行完畢,那么返回false。
          2. 如果任務已經取消,那么返回false。
          3. 循環直到設置任務為取消狀態,對于未啟動的任務將永遠不再執行,對于正在運行的任務,將根據mayInterruptIfRunning是否中斷其運行,如果不中斷那么任務將繼續運行直到結束。
          4. 此方法返回后任務要么處于運行結束狀態,要么處于取消狀態。isDone()將永遠返回true,如果cancel()方法返回true,isCancelled()始終返回true。

          來看看Future接口的實現類java.util.concurrent.FutureTask<V>具體是如何操作的。

          在FutureTask中使用了一個AQS數據結構來完成各種狀態以及加鎖、阻塞的實現。

          在此AQS類java.util.concurrent.FutureTask.Sync中一個任務用4中狀態:

          ThreadPoolExecutor-FutureTask-state

          初始情況下任務狀態state=0,任務執行(innerRun)后狀態變為運行狀態RUNNING(state=1),執行完畢后變成運行結束狀態RAN(state=2)。任務在初始狀態或者執行狀態被取消后就變為狀態CANCELLED(state=4)。AQS最擅長無鎖情況下處理幾種簡單的狀態變更的。

                  void innerRun() {
                     
          if (!compareAndSetState(0, RUNNING))
                         
          return;
                     
          try {
                          runner
          = Thread.currentThread();
                         
          if (getState() == RUNNING) // recheck after setting thread
                              innerSet(callable.call());
                         
          else
                              releaseShared(
          0); // cancel
                      } catch (Throwable ex) {
                          innerSetException(ex);
                      }
                  }

          執行一個任務有四步:設置運行狀態、設置當前線程(AQS需要)、執行任務(Runnable#run或者Callable#call)、設置執行結果。這里也可以看到,一個任務只能執行一次,因為執行完畢后它的狀態不在為初始值0,要么為CANCELLED,要么為RAN。

          取消一個任務(cancel)又是怎樣進行的呢?對比下前面取消任務的描述是不是很簡單,這里無非利用AQS的狀態來改變任務的執行狀態,最終達到放棄未啟動或者正在執行的任務的目的。

          boolean innerCancel(boolean mayInterruptIfRunning) {
             
          for (;;) {
                 
          int s = getState();
                 
          if (ranOrCancelled(s))
                     
          return false;
                 
          if (compareAndSetState(s, CANCELLED))
                     
          break;
              }
             
          if (mayInterruptIfRunning) {
                  Thread r
          = runner;
                 
          if (r != null)
                      r.interrupt();
              }
              releaseShared(
          0);
              done();
             
          return true;
          }

          到目前為止我們依然沒有說明到底是如何阻塞獲取一個結果的。下面四段代碼描述了這個過程。

          1     V innerGet() throws InterruptedException, ExecutionException {
          2         acquireSharedInterruptibly(0);
          3         if (getState() == CANCELLED)
          4             throw new CancellationException();
          5         if (exception != null)
          6             throw new ExecutionException(exception);
          7         return result;
          8     }
          9     //AQS#acquireSharedInterruptibly
          10     public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
          11         if (Thread.interrupted())
          12             throw new InterruptedException();
          13         if (tryAcquireShared(arg) < 0)
          14             doAcquireSharedInterruptibly(arg); //park current Thread for result
          15     }
          16     protected int tryAcquireShared(int ignore) {
          17         return innerIsDone()? 1 : -1;
          18     }
          19 
          20     boolean innerIsDone() {
          21         return ranOrCancelled(getState()) && runner == null;
          22     }

          當調用Future#get()的時候嘗試去獲取一個共享變量。這就涉及到AQS的使用方式了。這里獲取一個共享變量的狀態是任務是否結束(innerIsDone()),也就是任務是否執行完畢或者被取消。如果不滿足條件,那么在AQS中就會doAcquireSharedInterruptibly(arg)掛起當前線程,直到滿足條件。AQS前面講過,掛起線程使用的是LockSupport的park方式,因此性能消耗是很低的。

          至于將Runnable接口轉換成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一個簡單實現。

              static final class RunnableAdapter<T> implements Callable<T> {
                 
          final Runnable task;
                 
          final T result;
                  RunnableAdapter(Runnable  task, T result) {
                     
          this.task = task;
                     
          this.result = result;
                  }
                 
          public T call() {
                      task.run();
                     
          return result;
                  }
              }

          延遲、周期性任務調度的實現

          java.util.concurrent.ScheduledThreadPoolExecutor是默認的延遲、周期性任務調度的實現。

          有了整個線程池的實現,再回頭來看延遲、周期性任務調度的實現應該就很簡單了,因為所謂的延遲、周期性任務調度,無非添加一系列有序的任務隊列,然后按照執行順序的先后來處理整個任務隊列。如果是周期性任務,那么在執行完畢的時候加入下一個時間點的任務即可。

          由此可見,ScheduledThreadPoolExecutor和ThreadPoolExecutor的唯一區別在于任務是有序(按照執行時間順序)的,并且需要到達時間點(臨界點)才能執行,并不是任務隊列中有任務就需要執行的。也就是說唯一不同的就是任務隊列BlockingQueue<Runnable> workQueue不一樣。ScheduledThreadPoolExecutor的任務隊列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基于java.util.concurrent.DelayQueue<RunnableScheduledFuture>隊列的實現。

          DelayQueue是基于有序隊列PriorityQueue實現的。PriorityQueue 也叫優先級隊列,按照自然順序對元素進行排序,類似于TreeMap/Collections.sort一樣。

          同樣是有序隊列,DelayQueue和PriorityQueue區別在什么地方?

          由于DelayQueue在獲取元素時需要檢測元素是否“可用”,也就是任務是否達到“臨界點”(指定時間點),因此加入元素和移除元素會有一些額外的操作。

          典型的,移除元素需要檢測元素是否達到“臨界點”,增加元素的時候如果有一個元素比“頭元素”更早達到臨界點,那么就需要通知任務隊列。因此這需要一個條件變量final Condition available 。

          移除元素(出隊列)的過程是這樣的:

          • 總是檢測隊列的頭元素(順序最小元素,也是最先達到臨界點的元素)
          • 檢測頭元素與當前時間的差,如果大于0,表示還未到底臨界點,因此等待響應時間(使用條件變量available)
          • 如果小于或者等于0,說明已經到底臨界點或者已經過了臨界點,那么就移除頭元素,并且喚醒其它等待任務隊列的線程。
              public E take() throws InterruptedException {
                 
          final ReentrantLock lock = this.lock;
                  lock.lockInterruptibly();
                 
          try {
                     
          for (;;) {
                          E first
          = q.peek();
                         
          if (first == null) {
                              available.await();
                          }
          else {
                             
          long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                             
          if (delay > 0) {
                                 
          long tl = available.awaitNanos(delay);
                              }
          else {
                                  E x
          = q.poll();
                                 
          assert x != null;
                                 
          if (q.size() != 0)
                                      available.signalAll();
          // wake up other takers
                                  return x;

                              }
                          }
                      }
                  }
          finally {
                      lock.unlock();
                  }
              }

          同樣加入元素也會有相應的條件變量操作。當前僅當隊列為空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒“等待線程”去檢測元素。因為頭元素都沒有喚醒那么比頭元素更延遲的元素就更加不會喚醒。

              public boolean offer(E e) {
                 
          final ReentrantLock lock = this.lock;
                  lock.lock();
                 
          try {
                      E first
          = q.peek();
                      q.offer(e);
                     
          if (first == null || e.compareTo(first) < 0)
                          available.signalAll();
                     
          return true;
                  }
          finally {
                      lock.unlock();
                  }
              }

          有了任務隊列后再來看Future在ScheduledThreadPoolExecutor中是如何操作的。

          java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask<V>是繼承java.util.concurrent.FutureTask<V>的,區別在于執行任務是否是周期性的。

                  private void runPeriodic() {
                     
          boolean ok = ScheduledFutureTask.super.runAndReset();
                     
          boolean down = isShutdown();
                     
          // Reschedule if not cancelled and not shutdown or policy allows
                      if (ok && (!down ||
                                 (getContinueExistingPeriodicTasksAfterShutdownPolicy()
          &&
                                 
          !isStopped()))) {
                         
          long p = period;
                         
          if (p > 0)
                              time
          += p;
                         
          else
                              time
          = now() - p;
                          ScheduledThreadPoolExecutor.
          super.getQueue().add(this);
                      }
                     
          // This might have been the final executed delayed
                     
          // task.  Wake up threads to check.
                      else if (down)
                          interruptIdleWorkers();
                  }

                 
          /**
                   * Overrides FutureTask version so as to reset/requeue if periodic.
                  
          */
                 
          public void run() {
                     
          if (isPeriodic())
                          runPeriodic();
                     
          else
                          ScheduledFutureTask.
          super.run();
                  }
              }

          如果不是周期性任務調度,那么就和java.util.concurrent.FutureTask.Sync的調度方式是一樣的。如果是周期性任務(isPeriodic())那么就稍微有所不同的。

          ScheduledThreadPoolExecutor-ScheduledFutureTask

          先從功能/結構上分析下。第一種情況假設提交的任務每次執行花費10s,間隔(delay/period)為20s,對于scheduleAtFixedRate而言,每次執行開始時間20s,對于scheduleWithFixedDelay來說每次執行開始時間30s。第二種情況假設提交的任務每次執行時間花費20s,間隔(delay/period)為10s,對于scheduleAtFixedRate而言,每次執行開始時間10s,對于scheduleWithFixedDelay來說每次執行開始時間30s。(具體分析可以參考這里

          也就是說scheduleWithFixedDelay的執行開始時間為(delay+cost),而對于scheduleAtFixedRate來說執行開始時間為max(period,cost)。

          回頭再來看上面源碼runPeriodic()就很容易了。但特別要提醒的,如果任務的任何一個執行遇到異常,則后續執行都會被取消,這從runPeriodic()就能看出。要強調的第二點就是同一個周期性任務不會被同時執行。就比如說盡管上面第二種情況的scheduleAtFixedRate任務每隔10s執行到達一個時間點,但是由于每次執行時間花費為20s,因此每次執行間隔為20s,只不過執行的任務次數會多一點。但從本質上講就是每隔20s執行一次,如果任務隊列不取消的話。

          為什么不會同時執行?

          這是因為ScheduledFutureTask執行的時候會將任務從隊列中移除來,執行完畢以后才會添加下一個同序列的任務,因此任務隊列中其實最多只有同序列的任務的一份副本,所以永遠不會同時執行(盡管要執行的時間在過去)。

           

          ScheduledThreadPoolExecutor使用一個無界(容量無限,整數的最大值)的容器(DelayedWorkQueue隊列),根據ThreadPoolExecutor的原理,只要當容器滿的時候才會啟動一個大于corePoolSize的線程數。因此實際上ScheduledThreadPoolExecutor是一個固定線程大小的線程池,固定大小為corePoolSize,構造函數里面的Integer.MAX_VALUE其實是不生效的(盡管PriorityQueue使用數組實現有PriorityQueue大小限制,如果你的任務數超過了2147483647就會導致OutOfMemoryError,這個參考PriorityQueue的grow方法)。

           

          再回頭看scheduleAtFixedRate等方法就容易多了。無非就是往任務隊列中添加一個未來某一時刻的ScheduledFutureTask任務,如果是scheduleAtFixedRate那么period/delay就是正數,如果是scheduleWithFixedDelay那么period/delay就是一個負數,如果是0那么就是一次性任務。直接調用父類ThreadPoolExecutor的execute/submit等方法就相當于period/delay是0,并且initialDelay也是0。

              public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                           
          long initialDelay,
                                                           
          long period,
                                                            TimeUnit unit) {
                 
          if (command == null || unit == null)
                     
          throw new NullPointerException();
                 
          if (period <= 0)
                     
          throw new IllegalArgumentException();
                 
          if (initialDelay < 0) initialDelay = 0;
                 
          long triggerTime = now() + unit.toNanos(initialDelay);
                  RunnableScheduledFuture
          <?> t = decorateTask(command,
                     
          new ScheduledFutureTask<Object>(command,
                                                     
          null,
                                                      triggerTime,
                                                      unit.toNanos(period)));
                  delayedExecute(t);
                 
          return t;
              }

          另外需要補充說明的一點,前面說過java.util.concurrent.FutureTask.Sync任務只能執行一次,那么在runPeriodic()里面怎么又將執行過的任務加入隊列中呢?這是因為java.util.concurrent.FutureTask.Sync提供了一個innerRunAndReset()方法,此方法不僅執行任務還將任務的狀態還原成0(初始狀態)了,所以此任務就可以重復執行。這就是為什么runPeriodic()里面調用runAndRest()的緣故。

                  boolean innerRunAndReset() {
                     
          if (!compareAndSetState(0, RUNNING))
                         
          return false;
                     
          try {
                          runner
          = Thread.currentThread();
                         
          if (getState() == RUNNING)
                              callable.call();
          // don't set result
                          runner = null;
                         
          return compareAndSetState(RUNNING, 0);
                      }
          catch (Throwable ex) {
                          innerSetException(ex);
                         
          return false;
                      }
                  }

           

          后話

          整個并發實踐原理和實現(源碼)上的東西都講完了,后面幾個小節是一些總結和掃尾的工作,包括超時機制、異常處理等一些細節問題。也就是說大部分只需要搬出一些理論和最佳實踐知識出來就好了,不會有大量費腦筋的算法分析和原理、思想探討之類的。后面的章節也會加快一些進度。

          老實說從剛開始的好奇到中間的興奮,再到現在的徹悟,收獲還是很多,個人覺得這是最認真、最努力也是自我最滿意的一次技術研究和探討,同時在這個過程中將很多技術細節都串聯起來了,慢慢就有了那種技術相通的感覺。原來有了理論以后再去實踐、再去分析問題、解決問題和那種純解決問題得到的經驗完全不一樣。整個專輯下來不僅僅是并發包這一點點知識,設計到硬件、軟件、操作系統、網絡、安全、性能、算法、理論等等,總的來說這也算是一次比較成功的研究切入點,這比Guice那次探討要深入和持久的多。

          --



          ©2009-2014 IMXYLZ |求賢若渴
          posted on 2011-02-13 20:21 imxylz 閱讀(11299) 評論(6)  編輯  收藏 所屬分類: Java Concurrency

          評論

          # re: 深入淺出 Java Concurrency (35): 線程池 part 8 線程池的實現及原理 (3) 2011-03-30 15:37 現金流量表的編制方法
          教程些的真好  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (35): 線程池 part 8 線程池的實現及原理 (3) 2011-03-31 20:04 tb
          請教你個問題,ThreadPoolExecutor可以作為servlet的成員變量使用嗎?會不會有多線程不安全的問題?  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (35): 線程池 part 8 線程池的實現及原理 (3) 2011-04-01 09:47 xylz
          @tb
          Servlet本身是單例的,另外ThreadPoolExecutor本身就是為了處理多線程并發的,所以接口都是無狀態的,不會有多線程不安全的問題。當然了,如果Servlet持有ThreadPoolExecutor的狀態、中間結果等,那就有線程安全的問題了。
          關于線程安全,看這篇文章 http://www.aygfsteel.com/xylz/archive/2010/07/03/325168.html  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (35): 線程池 part 8 線程池的實現及原理 (3) 2011-05-28 17:33 devoleper
          把作者的整個專輯都看下來了,感覺受益很多。以前自己研究源碼沒有將分析過程記錄下來,這樣就沒有整體的把握,這個習慣自己要改掉。以后要多多向作者學習,研究精神、共享、貢獻精神。非常感謝!  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (35): 線程池 part 8 線程池的實現及原理 (3)[未登錄] 2011-09-16 20:46 xxx
          你說 對于scheduleAtFixedRate來說執行開始時間為max(period,cost)。但是你的圖上好像是 min(period,cost)。不知道是不是我理解錯了?  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (35): 線程池 part 8 線程池的實現及原理 (3) 2011-09-18 10:51 xylz
          @xxx

          這是沒有問題的。圖下面有一段文字描述。
          “就比如說盡管上面第二種情況的scheduleAtFixedRate任務每隔10s執行到達一個時間點,但是由于每次執行時間花費為20s,因此每次執行間隔為20s,只不過執行的任務次數會多一點。但從本質上講就是每隔20s執行一次,如果任務隊列不取消的話。”  回復  更多評論
            


          ©2009-2014 IMXYLZ
          主站蜘蛛池模板: 汾西县| 黄平县| 砀山县| 长治县| 朝阳市| 合水县| 奉贤区| 惠水县| 慈溪市| 泸州市| 鄯善县| 自治县| 黑山县| 景德镇市| 清远市| 司法| 青龙| 吉木萨尔县| 龙胜| 石阡县| 叙永县| 桐柏县| 湖州市| 晋州市| 务川| 南部县| 兴安县| 太和县| 丰原市| 九寨沟县| 凤冈县| 马鞍山市| 和顺县| 营山县| 峨边| 兴化市| 淮滨县| 武定县| 三江| 连江县| 汕尾市|