xylz,imxylz

          關(guān)注后端架構(gòu)、中間件、分布式和并發(fā)編程

             :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            111 隨筆 :: 10 文章 :: 2680 評(píng)論 :: 0 Trackbacks

          ConcurrentLinkedQueue是Queue的一個(gè)線程安全實(shí)現(xiàn)。先來看一段文檔說明。

          一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列。此隊(duì)列按照 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。隊(duì)列的頭部 是隊(duì)列中時(shí)間最長的元素。隊(duì)列的尾部 是隊(duì)列中時(shí)間最短的元素。新的元素插入到隊(duì)列的尾部,隊(duì)列獲取操作從隊(duì)列頭部獲得元素。當(dāng)多個(gè)線程共享訪問一個(gè)公共 collection 時(shí),ConcurrentLinkedQueue 是一個(gè)恰當(dāng)?shù)倪x擇。此隊(duì)列不允許使用 null 元素。

           

          由于ConcurrentLinkedQueue只是簡單的實(shí)現(xiàn)了一個(gè)隊(duì)列Queue,因此從API的角度講,沒有多少值的介紹,使用起來也很簡單,和前面遇到的所有FIFO隊(duì)列都類似。出隊(duì)列只能操作頭節(jié)點(diǎn),入隊(duì)列只能操作尾節(jié)點(diǎn),任意節(jié)點(diǎn)操作就需要遍歷完整的隊(duì)列。

          重點(diǎn)放在解釋ConcurrentLinkedQueue的原理和實(shí)現(xiàn)上。

           

          在繼續(xù)探討之前,結(jié)合前面線程安全的相關(guān)知識(shí),我來分析設(shè)計(jì)一個(gè)線程安全的隊(duì)列哪幾種方法。

          第一種:使用synchronized同步隊(duì)列,就像Vector或者Collections.synchronizedList/Collection那樣。顯然這不是一個(gè)好的并發(fā)隊(duì)列,這會(huì)導(dǎo)致吞吐量急劇下降。

          第二種:使用Lock。一種好的實(shí)現(xiàn)方式是使用ReentrantReadWriteLock來代替ReentrantLock提高讀取的吞吐量。但是顯然ReentrantReadWriteLock的實(shí)現(xiàn)更為復(fù)雜,而且更容易導(dǎo)致出現(xiàn)問題,另外也不是一種通用的實(shí)現(xiàn)方式,因?yàn)镽eentrantReadWriteLock適合哪種讀取量遠(yuǎn)遠(yuǎn)大于寫入量的場合。當(dāng)然了ReentrantLock是一種很好的實(shí)現(xiàn),結(jié)合Condition能夠很方便的實(shí)現(xiàn)阻塞功能,這在后面介紹BlockingQueue的時(shí)候會(huì)具體分析。

          第三種:使用CAS操作。盡管Lock的實(shí)現(xiàn)也用到了CAS操作,但是畢竟是間接操作,而且會(huì)導(dǎo)致線程掛起。一個(gè)好的并發(fā)隊(duì)列就是采用某種非阻塞算法來取得最大的吞吐量。

          ConcurrentLinkedQueue采用的就是第三種策略。它采用了參考資料1 中的算法。

          在鎖機(jī)制中談到過,要使用非阻塞算法來完成隊(duì)列操作,那么就需要一種“循環(huán)嘗試”的動(dòng)作,就是循環(huán)操作隊(duì)列,直到成功為止,失敗就會(huì)再次嘗試。這在前面的章節(jié)中多次介紹過。

           

          針對(duì)各種功能深入分析。

          在開始之前先介紹下ConcurrentLinkedQueue的數(shù)據(jù)結(jié)構(gòu)。

          image在上面的數(shù)據(jù)結(jié)構(gòu)中,ConcurrentLinkedQueue只有頭結(jié)點(diǎn)、尾節(jié)點(diǎn)兩個(gè)元素,而對(duì)于一個(gè)節(jié)點(diǎn)Node而言除了保存隊(duì)列元素item外,還有一個(gè)指向下一個(gè)節(jié)點(diǎn)的引用next。 看起來整個(gè)數(shù)據(jù)結(jié)構(gòu)還是比較簡單的。但是也有幾點(diǎn)是需要說明:

          1. 所有結(jié)構(gòu)(head/tail/item/next)都是volatile類型。 這是因?yàn)镃oncurrentLinkedQueue是非阻塞的,所以只有volatile才能使變量的寫操作對(duì)后續(xù)讀操作是可見的(這個(gè)是有happens-before法則保證的)。同樣也不會(huì)導(dǎo)致指令的重排序。
          2. 所有結(jié)構(gòu)的操作都帶有原子操作,這是由AtomicReferenceFieldUpdater保證的,這在原子操作中介紹過。它能保證需要的時(shí)候?qū)ψ兞康男薷牟僮魇窃拥摹?
          3. 由于隊(duì)列中任何一個(gè)節(jié)點(diǎn)(Node)只有下一個(gè)節(jié)點(diǎn)的引用,所以這個(gè)隊(duì)列是單向的,根據(jù)FIFO特性,也就是說出隊(duì)列在頭部(head),入隊(duì)列在尾部(tail)。頭部保存有進(jìn)入隊(duì)列最長時(shí)間的元素,尾部是最近進(jìn)入的元素。
          4. 沒有對(duì)隊(duì)列長度進(jìn)行計(jì)數(shù),所以隊(duì)列的長度是無限的,同時(shí)獲取隊(duì)列的長度的時(shí)間不是固定的,這需要遍歷整個(gè)隊(duì)列,并且這個(gè)計(jì)數(shù)也可能是不精確的。
          5. 初始情況下隊(duì)列頭和隊(duì)列尾都指向一個(gè)空節(jié)點(diǎn),但是非null,這是為了方便操作,不需要每次去判斷head/tail是否為空。但是head卻不作為存取元素的節(jié)點(diǎn),tail在不等于head情況下保存一個(gè)節(jié)點(diǎn)元素。也就是說head.item這個(gè)應(yīng)該一直是空,但是tail.item卻不一定是空(如果head!=tail,那么tail.item!=null)。

          對(duì)于第5點(diǎn),可以從ConcurrentLinkedQueue的初始化中看到。這種頭結(jié)點(diǎn)也叫“偽節(jié)點(diǎn)”,也就是說它不是真正的節(jié)點(diǎn),只是一標(biāo)識(shí),就像c中的字符數(shù)組后面的\0以后,只是用來標(biāo)識(shí)結(jié)束,并不是真正字符數(shù)組的一部分。

          private transient volatile Node<E> head = new Node<E>(null, null);
          private transient volatile Node<E> tail = head;

          有了上述5點(diǎn)再來解釋相關(guān)API操作就容易多了。

          在上一節(jié)中列出了add/offer/remove/poll/element/peek等價(jià)方法的區(qū)別,所以這里就不再重復(fù)了。

          清單1 入隊(duì)列操作

          public boolean offer(E e) {
              if (e == null) throw new NullPointerException();
              Node<E> n = new Node<E>(e, null);
              for (;;) {
                  Node<E> t = tail;
                  Node<E> s = t.getNext();
                  if (t == tail) {
                      if (s == null) {
                          if (t.casNext(s, n)) {
                              casTail(t, n);
                              return true;
                          }
                      } else {
                          casTail(t, s);
                      }
                  }
              }
          }

          清單1 描述的是入隊(duì)列的過程。整個(gè)過程是這樣的。

            1. 獲取尾節(jié)點(diǎn)t,以及尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)s。如果尾節(jié)點(diǎn)沒有被別人修改,也就是t==tail,進(jìn)行2,否則進(jìn)行1。
            2. 如果s不為空,也就是說此時(shí)尾節(jié)點(diǎn)后面還有元素,那么就需要把尾節(jié)點(diǎn)往后移,進(jìn)行1。否則進(jìn)行3。
            3. 修改尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)為新節(jié)點(diǎn),如果成功就修改尾節(jié)點(diǎn),返回true。否則進(jìn)行1。

          從操作3中可以看到是先修改尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),然后才修改尾節(jié)點(diǎn)位置的,所以這才有操作2中為什么獲取到的尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)不為空的原因。

          特別需要說明的是,對(duì)尾節(jié)點(diǎn)的tail的操作需要換成臨時(shí)變量t和s,一方面是為了去掉volatile變量的可變性,另一方面是為了減少volatile的性能影響。

           

          清單2 描述的出隊(duì)列的過程,這個(gè)過程和入隊(duì)列相似,有點(diǎn)意思。

          頭結(jié)點(diǎn)是為了標(biāo)識(shí)隊(duì)列起始,也為了減少空指針的比較,所以頭結(jié)點(diǎn)總是一個(gè)item為null的非null節(jié)點(diǎn)。也就是說head!=null并且head.item==null總是成立。所以實(shí)際上獲取的是head.next,一旦將頭結(jié)點(diǎn)head設(shè)置為head.next成功就將新head的item設(shè)置為null。至于以前就的頭結(jié)點(diǎn)h,h.item=null并且h.next為新的head,但是由于沒有對(duì)h的引用,所以最終會(huì)被GC回收。這就是整個(gè)出隊(duì)列的過程。

          清單2 出隊(duì)列操作

          public E poll() {
              for (;;) {
                  Node<E> h = head;
                  Node<E> t = tail;
                  Node<E> first = h.getNext();
                  if (h == head) {
                      if (h == t) {
                          if (first == null)
                              return null;
                          else
                              casTail(t, first);
                      } else if (casHead(h, first)) {
                          E item = first.getItem();
                          if (item != null) {
                              first.setItem(null);
                              return item;
                          }
                          // else skip over deleted item, continue loop,
                      }
                  }
              }
          }

           

          另外對(duì)于清單3 描述的獲取隊(duì)列大小的過程,由于沒有一個(gè)計(jì)數(shù)器來對(duì)隊(duì)列大小計(jì)數(shù),所以獲取隊(duì)列的大小只能通過從頭到尾完整的遍歷隊(duì)列,顯然這個(gè)代價(jià)是很大的。所以通常情況下ConcurrentLinkedQueue需要和一個(gè)AtomicInteger搭配才能獲取隊(duì)列大小。后面介紹的BlockingQueue正是使用了這種思想。

          清單3 遍歷隊(duì)列大小

          public int size() {
              int count = 0;
              for (Node<E> p = first(); p != null; p = p.getNext()) {
                  if (p.getItem() != null) {
                      // Collections.size() spec says to max out
                      if (++count == Integer.MAX_VALUE)
                          break;
                  }
              }
              return count;
          }

           

           

           

           

          參考資料:

          1. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
          2. 多線程基礎(chǔ)總結(jié)十一—ConcurrentLinkedQueue
          3. 對(duì)ConcurrentLinkedQueue進(jìn)行的并發(fā)測試 

           



          ©2009-2014 IMXYLZ |求賢若渴
          posted on 2010-07-23 14:11 imxylz 閱讀(20013) 評(píng)論(2)  編輯  收藏 所屬分類: Java Concurrency

          評(píng)論

          # re: 深入淺出 Java Concurrency (20): 并發(fā)容器 part 5 ConcurrentLinkedQueue 2013-12-16 17:09 mashiguang
          為什么java.util.concurrent.ArrayBlockingQueue<E>只用一個(gè)int記錄count, 而java.util.concurrent.LinkedBlockingQueue<E>卻要用一個(gè)AtomicInteger記錄count  回復(fù)  更多評(píng)論
            

          # re: 深入淺出 Java Concurrency (20): 并發(fā)容器 part 5 ConcurrentLinkedQueue 2015-06-01 10:57 liubey
          @mashiguang
          我猜測應(yīng)該是ArrayBlockingQueue用一把鎖而LinkedBlockingQueue用兩把鎖的原因  回復(fù)  更多評(píng)論
            


          ©2009-2014 IMXYLZ
          主站蜘蛛池模板: 沈阳市| 富裕县| 巩留县| 华池县| 南木林县| 日土县| 得荣县| 林甸县| 宁津县| 东方市| 凤冈县| 泌阳县| 易门县| 车致| 岐山县| 福安市| 广灵县| 彭阳县| 化州市| 霍邱县| 吉木萨尔县| 龙井市| 湛江市| 台湾省| 隆德县| 麻阳| 临夏县| 佛坪县| 永登县| 石泉县| 攀枝花市| 台北县| 水城县| 东乡县| 汝城县| 龙口市| 湖口县| 嫩江县| 偏关县| 仲巴县| 龙川县|