whitesock

          BlogJava 首頁 新隨筆 聯系 聚合 管理
            10 Posts :: 0 Stories :: 0 Comments :: 0 Trackbacks

          Inside AbstractQueuedSynchronizer (1)

          Inside AbstractQueuedSynchronizer (2)

          Inside AbstractQueuedSynchronizer (3)

          Inside AbstractQueuedSynchronizer (4)

          3 AbstractQueuedSynchronizer

          3.1 Inheritance

          AbstractQueuedSynchronizer繼承自AbstractOwnableSynchronizer。AbstractOwnableSynchronizer繼承自Object并且實現了Serializable接口,它只有一個成員變量private transient Thread exclusiveOwnerThread以及對應的getter/setter方法。該成員變量用于保存當前擁有排他訪問權的線程。需要注意的是,該成員變量沒有用volatile關鍵字修飾。


          3.2 State

          AbstractQueuedSynchronizer用一個int(private volatile int state)來保存同步狀態,以及對應的getter/setter/compareAndSetState方法。Java6新增了一個AbstractQueuedLongSynchronizer,它用一個long來保存同步狀態,貌似目前沒有被java.util.concurrent中的其它synchronizer所使用。對于ReentrantLock,state為0則意味著鎖沒有被任何線程持有;否則state保存了持有鎖的線程的重入次數。

          3.3 WaitQueue

          WaitQueue是AbstractQueuedSynchronizer的核心,它用于保存被阻塞的線程。它的實現是"CLH" (Craig, Landin, and Hagersten) lock queue的一個變種。

          標準的CLH lock queue通常被用來實現spin lock,它通過TheadLocal變量pred引用隊列中的前一個節點(Node本身沒有指向前后節點的引用),以下是標準的CLH lock queue的一個參考實現:

          public class ClhSpinLock {
              private final ThreadLocal<Node> pred;
              private final ThreadLocal<Node> node;
              private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());
          
              public ClhSpinLock() {
                  this.node = new ThreadLocal<Node>() {
                      protected Node initialValue() {
                          return new Node();
                      }
                  };
          
                  this.pred = new ThreadLocal<Node>() {
                      protected Node initialValue() {
                          return null;
                      }
                  };
              }
          
              public void lock() {
                  final Node node = this.node.get();
                  node.locked = true;
                  Node pred = this.tail.getAndSet(node);
                  this.pred.set(pred);
                  while (pred.locked) {}
              }
          
              public void unlock() {
                  final Node node = this.node.get();
                  node.locked = false;
                  this.node.set(this.pred.get());
              }
          
              private static class Node {
                  private volatile boolean locked;
              }
          }

          其邏輯并不復雜:對于lock操作,只需要通過一個CAS操作即可將當前線程對應的節點加入到隊列中,并且同時獲得了predecessor節點的引用,然后就是等待predecessor釋放鎖;對于unlock操作,只需要將當前線程對應節點的locked成員變量設置為false。unlock方法中的this.node.set(this.pred.get())主要目的是重用predecessor上的Node對象,這是對GC友好的一個優化。如果不考慮這個優化,那么this.node.set(new Node())也是可以的。跟那些TAS(test and set) spin lock和TTAS(test test and set) spin lock相比,CLH spin lock主要是解決了cache-coherence traffic的問題:每個線程在busy loop的時候,并沒有競爭同一個狀態,而是只判斷其對應predecessor的鎖定狀態。如果你擔心false sharing問題,那么可以考慮將鎖定狀態padding到cache line的長度。此外,CLH spin lock通過FIFO的隊列保證了鎖競爭的公平性。

          AbstractQueuedSynchronizer的靜態內部類Node維護了一個FIFO的等待隊列。跟CLH不同的是,Node中包含了指向predecessor和sucessor的引用。predecessor引用的作用是為了支持鎖等待超時(timeout)和鎖等待回退(cancellation)的功能。sucessor的作用是為了支持線程阻塞:在Inside AbstractQueuedSynchronizer (1) 中提到過,AbstractQueuedSynchronizer通過LockSupport實現了線程的block/unblock,因此需要通過successor引用找到后續的線程并將其喚醒(CLH spin lock因為是spin,所以不需要顯式地喚醒)。此外,Node中還包括一個volatile int waitStatus成員變量用于控制線程的阻塞/喚醒,以及避免不必要的調用LockSupport的park/unpark方法。需要注意的是,雖然AbstractQueuedSynchronizer在絕大多數情況下是通過LockSupport進行線程的阻塞/喚醒,但是在特定情況下也會使用spin lock,static final long spinForTimeoutThreshold = 1000L這個靜態變量設定了使用spin lock的一個閾值。

          對WaitQueue進行enqueue操作相關的代碼如下:

          private Node addWaiter(Node mode) {
              Node node = new Node(Thread.currentThread(), mode);
              // Try the fast path of enq; backup to full enq on failure
              Node pred = tail;
              if (pred != null) {
                  node.prev = pred;
                  if (compareAndSetTail(pred, node)) {
                      pred.next = node;
                      return node;
                  }
              }
              enq(node);
              return node;
          }
          
          private Node enq(final Node node) {
              for (;;) {
                  Node t = tail;
                  if (t == null) { // Must initialize
                      if (compareAndSetHead(new Node()))
                          tail = head;
                  } else {
                      node.prev = t;
                      if (compareAndSetTail(t, node)) {
                          t.next = node;
                          return t;
                      }
                  }
              }
          }

          addWaiter方法中的那個if分支,其實是一種優化,如果失敗那么會調用enq方法進行enqueue。需要注意的是,以上代碼中對next引用的設定是在enqueue成功之后進行的。這樣做雖然沒有并發問題,但是在判斷一個node是否有sucessor時,不能僅僅通過next == null來判斷,因為enqueue和設置next引用這兩個步驟不是一個原子操作。

          對WaitQueue進行dequeue操作相關的代碼如下:

          private void setHead(Node node) {
              head = node;
              node.thread = null;
              node.prev = null;
          }

          setHead方法沒有用到鎖,也沒有使用CAS,這樣沒有并發問題?沒有,因為這個方法只會被持有鎖的線程所調用,此時只需要將head指向持有鎖的線程對應的node即可。


          posted on 2012-01-07 17:54 whitesock 閱讀(329) 評論(0)  編輯  收藏

          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 吐鲁番市| 林周县| 剑河县| 冷水江市| 株洲市| 阿尔山市| 华坪县| 碌曲县| 崇明县| 温泉县| 双辽市| 太和县| 宣城市| 星子县| 章丘市| 牡丹江市| 阿拉善盟| 板桥市| 天津市| 海淀区| 聂荣县| 厦门市| 凌海市| 楚雄市| 长兴县| 彰武县| 雅安市| 铁力市| 凤庆县| 石城县| 红原县| 阿荣旗| 巫山县| 云林县| 阳信县| 云龙县| 陇西县| 油尖旺区| 孝义市| 五常市| 乡宁县|