上善若水
          In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatra
          posts - 146,comments - 147,trackbacks - 0

          一直想好好學(xué)習(xí)concurrent包中的各個類的實現(xiàn),然而經(jīng)常看了一點就因為其他事情干擾而放下了。發(fā)現(xiàn)這樣太不利于自己的成長了,因而最近打算潛心一件一件的完成自己想學(xué)習(xí)的東西。

          對concurrent包的學(xué)習(xí)打算先從Lock的實現(xiàn)開始,因而自然而然的就端起了AbstractQueuedSynchronizer,然而要讀懂這個類的源碼并不是那么容易,因而我就開始問自己一個問題:如果自己要去實現(xiàn)這個一個Lock對象,應(yīng)該如何實現(xiàn)呢?

          要實現(xiàn)Lock對象,首先理解什么是鎖?我自己從編程角度簡單的理解,所謂鎖對象(互斥鎖)就是它能保證一次只有一個線程能進入它保護的臨界區(qū),如果有一個線程已經(jīng)拿到鎖對象,那么其他對象必須讓權(quán)等待,而在該線程退出這個臨界區(qū)時需要喚醒等待列表中的其他線程。更學(xué)術(shù)一些,《計算機操作系統(tǒng)》中對同步機制準(zhǔn)則的歸納(P50):

          1. 空閑讓進。當(dāng)無進程處于臨界區(qū)時,表明臨界資源處于空閑狀態(tài),應(yīng)允許一個請求進入臨界區(qū)的進程立即進入自己的臨界區(qū),以有效的利用臨界資源。
          2. 忙則等待。當(dāng)已有進程進入臨界區(qū)時,表明臨界資源正在被訪問,因而其他試圖進入臨界區(qū)的進程必須等待,以保證對臨界區(qū)資源的互斥訪問。
          3. 有限等待。對要求訪問臨界資源的進程,應(yīng)保證在有限時間內(nèi)能進入自己的臨界區(qū),以免陷入“死等”狀態(tài)。
          4. 讓權(quán)等待。當(dāng)進程不能進入自己的臨界區(qū)時,應(yīng)該釋放處理機,以免進程陷入“忙等”狀態(tài)。

          說了那么多,其實對互斥鎖很簡單,只需要一個標(biāo)記位,如果該標(biāo)記位為0,表示沒有被占用,因而直接獲得鎖,然后把該標(biāo)記位置為1,此時其他線程發(fā)現(xiàn)該標(biāo)記位已經(jīng)是1,因而需要等待。這里對這個標(biāo)記位的比較并設(shè)值必須是原子操作,而在JDK5以后提供的atomic包里的工具類可以很方便的提供這個原子操作。然而上面的四個準(zhǔn)則應(yīng)該漏了一點,即釋放鎖的線程(進程)和得到鎖的線程(進程)應(yīng)該是同一個,就像一把鑰匙對應(yīng)一把鎖(理想的),所以一個非常簡單的Lock類可以這么實現(xiàn):

          public class SpinLockV1 {
              
          private final AtomicInteger state = new AtomicInteger(0);
              
          private volatile Thread owner; // 這里owner字段可能存在中間值,不可靠,因而其他線程不可以依賴這個字段的值
              
              
          public void lock() {
                  
          while (!state.compareAndSet(01)) { }
                  owner 
          = Thread.currentThread();
              }
              
              
          public void unlock() {
                  Thread currentThread 
          = Thread.currentThread();
                  
          if (owner != currentThread || !state.compareAndSet(10)) {
                      
          throw new IllegalStateException("The lock is not owned by thread: " + currentThread);
                  }
                  owner 
          = null;
              }
          }

          一個簡單的測試方法:

              @Test
              
          public void testLockCorrectly() throws InterruptedException {
                  
          final int COUNT = 100;
                  Thread[] threads 
          = new Thread[COUNT];
                  SpinLockV1 lock 
          = new SpinLockV1();
                  AddRunner runner 
          = new AddRunner(lock);
                  
          for (int i = 0; i < COUNT; i++) { 
                      threads[i] 
          = new Thread(runner, "thread-" + i);
                      threads[i].start();
                  }
                  
                  
          for (int i = 0; i < COUNT; i++) {
                      threads[i].join();
                  }
                  
                  assertEquals(COUNT, runner.getState());
              }
              
              
          private static class AddRunner implements Runnable {
                  
          private final SpinLockV1 lock;
                  
          private int state = 0;

                  
          public AddRunner(SpinLockV1 lock) {
                      
          this.lock = lock;
                  }
                  
                  
          public void run() {
                      lock.lock();
                      
          try {
                          quietSleep(
          10);
                          state
          ++;
                          System.out.println(Thread.currentThread().getName() 
          + "" + state);
                      } 
          finally {
                          lock.unlock();
                      }
                  }
                  
                  
          public int getState() {
                      
          return state;
                  }
              }

          然而這個SpinLock其實并不需要state這個字段,因為owner的賦值與否也是一種狀態(tài),因而可以用它作為一種互斥狀態(tài):

          public class SpinLockV2 {
              
          private final AtomicReference<Thread> owner = new AtomicReference<Thread>(null);
              
              
          public void lock() {
                  
          final Thread currentThread = Thread.currentThread();
                  
          while (!owner.compareAndSet(null, currentThread)) { }
              }
              
              
          public void unlock() {
                  Thread currentThread 
          = Thread.currentThread();
                  
          if (!owner.compareAndSet(currentThread, null)) {
                      
          throw new IllegalStateException("The lock is not owned by thread: " + currentThread);
                  }
              }
          }

          這在操作系統(tǒng)中被定義為整形信號量,然而整形信號量如果沒拿到鎖會一直處于“忙等”狀態(tài)(沒有遵循有限等待和讓權(quán)等待的準(zhǔn)則),因而這種鎖也叫Spin Lock,在短暫的等待中它可以提升性能,因為可以減少線程的切換,concurrent包中的Atomic大部分都采用這種機制實現(xiàn),然而如果需要長時間的等待,“忙等”會占用不必要的CPU時間,從而性能會變的很差,這個時候就需要將沒有拿到鎖的線程放到等待列表中,這種方式在操作系統(tǒng)中也叫記錄型信號量,它遵循了讓權(quán)等待準(zhǔn)則(當(dāng)前沒有實現(xiàn)有限等待準(zhǔn)則)。在JDK6以后提供了LockSupport.park()/LockSupport.unpark()操作,可以將當(dāng)前線程放入一個等待列表或?qū)⒁粋€線程從這個等待列表中喚醒。然而這個park/unpark的等待列表是一個全局的等待列表,在unpartk的時候還是需要提供需要喚醒的Thread對象,因而我們需要維護自己的等待列表,但是如果我們可以用JDK提供的工具類ConcurrentLinkedQueue,就非常容易實現(xiàn),如LockSupport文檔中給出來的代碼事例

          class FIFOMutex {
             
          private final AtomicBoolean locked = new AtomicBoolean(false);
             
          private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();

             
          public void lock() {
               
          boolean wasInterrupted = false;
               Thread current 
          = Thread.currentThread();
               waiters.add(current);

               
          // Block while not first in queue or cannot acquire lock
               while (waiters.peek() != current || !locked.compareAndSet(falsetrue)) {
                  LockSupport.park(
          this);
                  
          if (Thread.interrupted()) // ignore interrupts while waiting
                    wasInterrupted = true;
               }

               waiters.remove();
               
          if (wasInterrupted)          // reassert interrupt status on exit
                  current.interrupt();
             }

             
          public void unlock() {
               locked.set(
          false);
               LockSupport.unpark(waiters.peek());
             }
           }

          在該代碼事例中,有一個線程等待隊列和鎖標(biāo)記字段,每次調(diào)用lock時先將當(dāng)前線程放入這個等待隊列中,然后拿出隊列頭線程對象,如果該線程對象正好是當(dāng)前線程,并且成功 使用CAS方式設(shè)置locked字段(這里需要兩個同時滿足,因為可能出現(xiàn)一個線程已經(jīng)從隊列中移除了但還沒有unlock,此時另一個線程調(diào)用lock方法,此時隊列頭的線程就是第二個線程,然而由于第一個線程還沒有unlock或者正在unlock,因而需要使用CAS原子操作來判斷是否要park),表示該線程競爭成功,獲得鎖,否則將當(dāng)前線程park,這里之所以要放在 while循環(huán)中,因為park操作可能無理由返回(spuriously),如文檔中給出的描述:

          LockSupport.park()
          public static void park(Object blocker)
          Disables the current thread for thread scheduling purposes unless the permit is available.

          If the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens:

          • Some other thread invokes unpark with the current thread as the target; or
          • Some other thread interrupts the current thread; or
          • The call spuriously (that is, for no reason) returns.

          This method does not report which of these caused the method to return. Callers should re-check the conditions which caused the thread to park in the first place. Callers may also determine, for example, the interrupt status of the thread upon return.

          Parameters:
          blocker - the synchronization object responsible for this thread parking
          Since:
          1.6
          我在實現(xiàn)自己的類時就被這個“無理由返回”坑了好久。對于已經(jīng)獲得鎖的線程,將該線程從等待隊列中移除,這里由于ConcurrentLinkedQueue是線程安全的,因而能保證每次都是隊列頭的線程得到鎖,因而在得到鎖匙將隊列頭移除。unlock邏輯比較簡單,只需要將locked字段打開(設(shè)置為false),喚醒(unpark)隊列頭的線程即可,然后該線程會繼續(xù)在lock方法的while循環(huán)中繼續(xù)競爭unlocked字段,并將它自己從線程隊列中移除表示獲得鎖成功。當(dāng)然安全起見,最好在unlock中加入一些驗證邏輯,如解鎖的線程和加鎖的線程需要相同。

          然而本文的目的是自己實現(xiàn)一個Lock對象,即只使用一些基本的操作,而不使用JDK提供的Atomic類和ConcurrentLinkedQueue。類似的首先我們也需要一個隊列存放等待線程隊列(公平起見,使用先進先出隊列),因而先定義一個Node對象用以構(gòu)成這個隊列:

           

              protected static class Node {
                  
          volatile Thread owner;
                  
          volatile Node prev;
                  
          volatile Node next;
                  
                  
          public Node(Thread owner) {
                      
          this.owner = owner;
                      
          this.state = INIT;
                  }
                  
                  
          public Node() {
                      
          this(Thread.currentThread());
                  }
              }

          簡單起見,隊列頭是一個起點的placeholder,每個調(diào)用lock的線程都先將自己競爭放入這個隊列尾,每個隊列頭后一個線程(Node)即是獲得鎖的線程,所以我們需要有head Node字段用以快速獲取隊列頭的后一個Node,而tail Node字段用來快速插入新的Node,所以關(guān)鍵在于如何線程安全的構(gòu)建這個隊列,方法還是一樣的,使用CAS操作,即CAS方法將自己設(shè)置成tail值,然后重新構(gòu)建這個列表:

              protected boolean enqueue(Node node) {
                  
          while (true) {
                      
          final Node preTail = tail;
                      node.prev 
          = preTail;
                      
          if (compareAndSetTail(preTail, node)) {
                          preTail.next 
          = node;
                          
          return node.prev == head;
                      }
                  }
              }

          在當(dāng)前線程Node以線程安全的方式放入這個隊列后,lock實現(xiàn)相對就比較簡單了,如果當(dāng)前Node是的前驅(qū)是head,該線程獲得鎖,否則park當(dāng)前線程,處理park無理由返回的問題,因而將park放入while循環(huán)中(該實現(xiàn)是一個不可重入的實現(xiàn)):

              public void lock() {
                  
          // Put the latest node to a queue first, then check if the it is the first node
                  
          // this way, the list is the only shared resource to deal with
                  Node node = new Node();
                  
          if (enqueue(node)) {
                      current 
          = node.owner;
                  } 
          else {
                      
          while (node.prev != head) {
                          LockSupport.park(
          this); // This may return "spuriously"!!, so put it to while
                      }

                      current 
          = node.owner;
                  }
              }

          unlock的實現(xiàn)需要考慮多種情況,如果當(dāng)前Node(head.next)有后驅(qū),那么直接unpark該后驅(qū)即可;如果沒有,表示當(dāng)前已經(jīng)沒有其他線程在等待隊列中,然而在這個判斷過程中可能會有其他線程進入,因而需要用CAS的方式設(shè)置tail,如果設(shè)置失敗,表示此時有其他線程進入,因而需要將該新進入的線程unpark從而該新進入的線程在調(diào)用park后可以立即返回(這里的CAS和enqueue的CAS都是對tail操作,因而能保證狀態(tài)一致):

              public void unlock() {
                  Node curNode 
          = unlockValidate();
                  Node next 
          = curNode.next;
                  
          if (next != null) {
                     
          head.next = next;
                      next.prev 
          = head;
                      LockSupport.unpark(next.owner);
                  } 
          else {
                      
          if (!compareAndSetTail(curNode, head)) {
                         
          while (curNode.next == null) { } // Wait until the next available
                          // Another node queued during the time, so we have to unlock that, or else, this node can never unparked
                          unlock();
                      } 
          else {
                         
          compareAndSetNext(head, curNode, null); // Still use CAS here as the head.next may already been changed
                      }
                  }
              }

          具體的代碼和測試類可以參考查看這里


          其實直到自己寫完這個類后才直到者其實這是一個MCS鎖的變種,因而這個實現(xiàn)每個線程park在自身對應(yīng)的node上,而由前一個線程unpark它;而AbstractQueuedSynchronizer是CLH鎖,因為它的park由前驅(qū)狀態(tài)決定,雖然它也是由前一個線程unpark它。具體可以參考這里

          posted on 2015-08-11 06:08 DLevin 閱讀(5446) 評論(0)  編輯  收藏 所屬分類: CodeToolsMultiThreading
          主站蜘蛛池模板: 涡阳县| 东阳市| 大厂| 高青县| 山西省| 精河县| 宜宾市| 峨边| 武邑县| 旺苍县| 竹北市| 濮阳市| 泸定县| 林口县| 土默特左旗| 大邑县| 贵阳市| 海口市| 麻江县| 远安县| 海门市| 呼伦贝尔市| 望都县| 河北区| 随州市| 巴彦淖尔市| 永嘉县| 抚顺市| 泾阳县| 乐安县| 讷河市| 哈巴河县| 洪雅县| 台南市| 云浮市| 临猗县| 濮阳市| 临高县| 晋宁县| 新源县| 青河县|