上善若水
          In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatra
          posts - 146,comments - 147,trackbacks - 0

          一直想好好學(xué)習(xí)concurrent包中的各個類的實(shí)現(xiàn),然而經(jīng)常看了一點(diǎn)就因?yàn)槠渌虑楦蓴_而放下了。發(fā)現(xiàn)這樣太不利于自己的成長了,因而最近打算潛心一件一件的完成自己想學(xué)習(xí)的東西。

          對concurrent包的學(xué)習(xí)打算先從Lock的實(shí)現(xiàn)開始,因而自然而然的就端起了AbstractQueuedSynchronizer,然而要讀懂這個類的源碼并不是那么容易,因而我就開始問自己一個問題:如果自己要去實(shí)現(xiàn)這個一個Lock對象,應(yīng)該如何實(shí)現(xiàn)呢?

          要實(shí)現(xiàn)Lock對象,首先理解什么是鎖?我自己從編程角度簡單的理解,所謂鎖對象(互斥鎖)就是它能保證一次只有一個線程能進(jìn)入它保護(hù)的臨界區(qū),如果有一個線程已經(jīng)拿到鎖對象,那么其他對象必須讓權(quán)等待,而在該線程退出這個臨界區(qū)時需要喚醒等待列表中的其他線程。更學(xué)術(shù)一些,《計(jì)算機(jī)操作系統(tǒng)》中對同步機(jī)制準(zhǔn)則的歸納(P50):

          1. 空閑讓進(jìn)。當(dāng)無進(jìn)程處于臨界區(qū)時,表明臨界資源處于空閑狀態(tài),應(yīng)允許一個請求進(jìn)入臨界區(qū)的進(jìn)程立即進(jìn)入自己的臨界區(qū),以有效的利用臨界資源。
          2. 忙則等待。當(dāng)已有進(jìn)程進(jìn)入臨界區(qū)時,表明臨界資源正在被訪問,因而其他試圖進(jìn)入臨界區(qū)的進(jìn)程必須等待,以保證對臨界區(qū)資源的互斥訪問。
          3. 有限等待。對要求訪問臨界資源的進(jìn)程,應(yīng)保證在有限時間內(nèi)能進(jìn)入自己的臨界區(qū),以免陷入“死等”狀態(tài)。
          4. 讓權(quán)等待。當(dāng)進(jìn)程不能進(jìn)入自己的臨界區(qū)時,應(yīng)該釋放處理機(jī),以免進(jìn)程陷入“忙等”狀態(tài)。

          說了那么多,其實(shí)對互斥鎖很簡單,只需要一個標(biāo)記位,如果該標(biāo)記位為0,表示沒有被占用,因而直接獲得鎖,然后把該標(biāo)記位置為1,此時其他線程發(fā)現(xiàn)該標(biāo)記位已經(jīng)是1,因而需要等待。這里對這個標(biāo)記位的比較并設(shè)值必須是原子操作,而在JDK5以后提供的atomic包里的工具類可以很方便的提供這個原子操作。然而上面的四個準(zhǔn)則應(yīng)該漏了一點(diǎn),即釋放鎖的線程(進(jìn)程)和得到鎖的線程(進(jìn)程)應(yīng)該是同一個,就像一把鑰匙對應(yīng)一把鎖(理想的),所以一個非常簡單的Lock類可以這么實(shí)現(xiàn):

          public class SpinLockV1 {
              
          private final AtomicInteger state = new AtomicInteger(0);
              
          private volatile Thread owner; // 這里owner字段可能存在中間值,不可靠,因而其他線程不可以依賴這個字段的值
              
              
          public void lock() {
                  
          while (!state.compareAndSet(01)) { }
                  owner 
          = Thread.currentThread();
              }
              
              
          public void unlock() {
                  Thread currentThread 
          = Thread.currentThread();
                  
          if (owner != currentThread || !state.compareAndSet(10)) {
                      
          throw new IllegalStateException("The lock is not owned by thread: " + currentThread);
                  }
                  owner 
          = null;
              }
          }

          一個簡單的測試方法:

              @Test
              
          public void testLockCorrectly() throws InterruptedException {
                  
          final int COUNT = 100;
                  Thread[] threads 
          = new Thread[COUNT];
                  SpinLockV1 lock 
          = new SpinLockV1();
                  AddRunner runner 
          = new AddRunner(lock);
                  
          for (int i = 0; i < COUNT; i++) { 
                      threads[i] 
          = new Thread(runner, "thread-" + i);
                      threads[i].start();
                  }
                  
                  
          for (int i = 0; i < COUNT; i++) {
                      threads[i].join();
                  }
                  
                  assertEquals(COUNT, runner.getState());
              }
              
              
          private static class AddRunner implements Runnable {
                  
          private final SpinLockV1 lock;
                  
          private int state = 0;

                  
          public AddRunner(SpinLockV1 lock) {
                      
          this.lock = lock;
                  }
                  
                  
          public void run() {
                      lock.lock();
                      
          try {
                          quietSleep(
          10);
                          state
          ++;
                          System.out.println(Thread.currentThread().getName() 
          + "" + state);
                      } 
          finally {
                          lock.unlock();
                      }
                  }
                  
                  
          public int getState() {
                      
          return state;
                  }
              }

          然而這個SpinLock其實(shí)并不需要state這個字段,因?yàn)閛wner的賦值與否也是一種狀態(tài),因而可以用它作為一種互斥狀態(tài):

          public class SpinLockV2 {
              
          private final AtomicReference<Thread> owner = new AtomicReference<Thread>(null);
              
              
          public void lock() {
                  
          final Thread currentThread = Thread.currentThread();
                  
          while (!owner.compareAndSet(null, currentThread)) { }
              }
              
              
          public void unlock() {
                  Thread currentThread 
          = Thread.currentThread();
                  
          if (!owner.compareAndSet(currentThread, null)) {
                      
          throw new IllegalStateException("The lock is not owned by thread: " + currentThread);
                  }
              }
          }

          這在操作系統(tǒng)中被定義為整形信號量,然而整形信號量如果沒拿到鎖會一直處于“忙等”狀態(tài)(沒有遵循有限等待和讓權(quán)等待的準(zhǔn)則),因而這種鎖也叫Spin Lock,在短暫的等待中它可以提升性能,因?yàn)榭梢詼p少線程的切換,concurrent包中的Atomic大部分都采用這種機(jī)制實(shí)現(xiàn),然而如果需要長時間的等待,“忙等”會占用不必要的CPU時間,從而性能會變的很差,這個時候就需要將沒有拿到鎖的線程放到等待列表中,這種方式在操作系統(tǒng)中也叫記錄型信號量,它遵循了讓權(quán)等待準(zhǔn)則(當(dāng)前沒有實(shí)現(xiàn)有限等待準(zhǔn)則)。在JDK6以后提供了LockSupport.park()/LockSupport.unpark()操作,可以將當(dāng)前線程放入一個等待列表或?qū)⒁粋€線程從這個等待列表中喚醒。然而這個park/unpark的等待列表是一個全局的等待列表,在unpartk的時候還是需要提供需要喚醒的Thread對象,因而我們需要維護(hù)自己的等待列表,但是如果我們可以用JDK提供的工具類ConcurrentLinkedQueue,就非常容易實(shí)現(xiàn),如LockSupport文檔中給出來的代碼事例

          class FIFOMutex {
             
          private final AtomicBoolean locked = new AtomicBoolean(false);
             
          private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();

             
          public void lock() {
               
          boolean wasInterrupted = false;
               Thread current 
          = Thread.currentThread();
               waiters.add(current);

               
          // Block while not first in queue or cannot acquire lock
               while (waiters.peek() != current || !locked.compareAndSet(falsetrue)) {
                  LockSupport.park(
          this);
                  
          if (Thread.interrupted()) // ignore interrupts while waiting
                    wasInterrupted = true;
               }

               waiters.remove();
               
          if (wasInterrupted)          // reassert interrupt status on exit
                  current.interrupt();
             }

             
          public void unlock() {
               locked.set(
          false);
               LockSupport.unpark(waiters.peek());
             }
           }

          在該代碼事例中,有一個線程等待隊(duì)列和鎖標(biāo)記字段,每次調(diào)用lock時先將當(dāng)前線程放入這個等待隊(duì)列中,然后拿出隊(duì)列頭線程對象,如果該線程對象正好是當(dāng)前線程,并且成功 使用CAS方式設(shè)置locked字段(這里需要兩個同時滿足,因?yàn)榭赡艹霈F(xiàn)一個線程已經(jīng)從隊(duì)列中移除了但還沒有unlock,此時另一個線程調(diào)用lock方法,此時隊(duì)列頭的線程就是第二個線程,然而由于第一個線程還沒有unlock或者正在unlock,因而需要使用CAS原子操作來判斷是否要park),表示該線程競爭成功,獲得鎖,否則將當(dāng)前線程park,這里之所以要放在 while循環(huán)中,因?yàn)閜ark操作可能無理由返回(spuriously),如文檔中給出的描述:

          LockSupport.park()
          public static void park(Object blocker)
          Disables the current thread for thread scheduling purposes unless the permit is available.

          If the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens:

          • Some other thread invokes unpark with the current thread as the target; or
          • Some other thread interrupts the current thread; or
          • The call spuriously (that is, for no reason) returns.

          This method does not report which of these caused the method to return. Callers should re-check the conditions which caused the thread to park in the first place. Callers may also determine, for example, the interrupt status of the thread upon return.

          Parameters:
          blocker - the synchronization object responsible for this thread parking
          Since:
          1.6
          我在實(shí)現(xiàn)自己的類時就被這個“無理由返回”坑了好久。對于已經(jīng)獲得鎖的線程,將該線程從等待隊(duì)列中移除,這里由于ConcurrentLinkedQueue是線程安全的,因而能保證每次都是隊(duì)列頭的線程得到鎖,因而在得到鎖匙將隊(duì)列頭移除。unlock邏輯比較簡單,只需要將locked字段打開(設(shè)置為false),喚醒(unpark)隊(duì)列頭的線程即可,然后該線程會繼續(xù)在lock方法的while循環(huán)中繼續(xù)競爭unlocked字段,并將它自己從線程隊(duì)列中移除表示獲得鎖成功。當(dāng)然安全起見,最好在unlock中加入一些驗(yàn)證邏輯,如解鎖的線程和加鎖的線程需要相同。

          然而本文的目的是自己實(shí)現(xiàn)一個Lock對象,即只使用一些基本的操作,而不使用JDK提供的Atomic類和ConcurrentLinkedQueue。類似的首先我們也需要一個隊(duì)列存放等待線程隊(duì)列(公平起見,使用先進(jìn)先出隊(duì)列),因而先定義一個Node對象用以構(gòu)成這個隊(duì)列:

           

              protected static class Node {
                  
          volatile Thread owner;
                  
          volatile Node prev;
                  
          volatile Node next;
                  
                  
          public Node(Thread owner) {
                      
          this.owner = owner;
                      
          this.state = INIT;
                  }
                  
                  
          public Node() {
                      
          this(Thread.currentThread());
                  }
              }

          簡單起見,隊(duì)列頭是一個起點(diǎn)的placeholder,每個調(diào)用lock的線程都先將自己競爭放入這個隊(duì)列尾,每個隊(duì)列頭后一個線程(Node)即是獲得鎖的線程,所以我們需要有head Node字段用以快速獲取隊(duì)列頭的后一個Node,而tail Node字段用來快速插入新的Node,所以關(guān)鍵在于如何線程安全的構(gòu)建這個隊(duì)列,方法還是一樣的,使用CAS操作,即CAS方法將自己設(shè)置成tail值,然后重新構(gòu)建這個列表:

              protected boolean enqueue(Node node) {
                  
          while (true) {
                      
          final Node preTail = tail;
                      node.prev 
          = preTail;
                      
          if (compareAndSetTail(preTail, node)) {
                          preTail.next 
          = node;
                          
          return node.prev == head;
                      }
                  }
              }

          在當(dāng)前線程N(yùn)ode以線程安全的方式放入這個隊(duì)列后,lock實(shí)現(xiàn)相對就比較簡單了,如果當(dāng)前Node是的前驅(qū)是head,該線程獲得鎖,否則park當(dāng)前線程,處理park無理由返回的問題,因而將park放入while循環(huán)中(該實(shí)現(xiàn)是一個不可重入的實(shí)現(xiàn)):

              public void lock() {
                  
          // Put the latest node to a queue first, then check if the it is the first node
                  
          // this way, the list is the only shared resource to deal with
                  Node node = new Node();
                  
          if (enqueue(node)) {
                      current 
          = node.owner;
                  } 
          else {
                      
          while (node.prev != head) {
                          LockSupport.park(
          this); // This may return "spuriously"!!, so put it to while
                      }

                      current 
          = node.owner;
                  }
              }

          unlock的實(shí)現(xiàn)需要考慮多種情況,如果當(dāng)前Node(head.next)有后驅(qū),那么直接unpark該后驅(qū)即可;如果沒有,表示當(dāng)前已經(jīng)沒有其他線程在等待隊(duì)列中,然而在這個判斷過程中可能會有其他線程進(jìn)入,因而需要用CAS的方式設(shè)置tail,如果設(shè)置失敗,表示此時有其他線程進(jìn)入,因而需要將該新進(jìn)入的線程unpark從而該新進(jìn)入的線程在調(diào)用park后可以立即返回(這里的CAS和enqueue的CAS都是對tail操作,因而能保證狀態(tài)一致):

              public void unlock() {
                  Node curNode 
          = unlockValidate();
                  Node next 
          = curNode.next;
                  
          if (next != null) {
                     
          head.next = next;
                      next.prev 
          = head;
                      LockSupport.unpark(next.owner);
                  } 
          else {
                      
          if (!compareAndSetTail(curNode, head)) {
                         
          while (curNode.next == null) { } // Wait until the next available
                          // Another node queued during the time, so we have to unlock that, or else, this node can never unparked
                          unlock();
                      } 
          else {
                         
          compareAndSetNext(head, curNode, null); // Still use CAS here as the head.next may already been changed
                      }
                  }
              }

          具體的代碼和測試類可以參考查看這里


          其實(shí)直到自己寫完這個類后才直到者其實(shí)這是一個MCS鎖的變種,因而這個實(shí)現(xiàn)每個線程park在自身對應(yīng)的node上,而由前一個線程unpark它;而AbstractQueuedSynchronizer是CLH鎖,因?yàn)樗膒ark由前驅(qū)狀態(tài)決定,雖然它也是由前一個線程unpark它。具體可以參考這里

          posted on 2015-08-11 06:08 DLevin 閱讀(5447) 評論(0)  編輯  收藏 所屬分類: CodeToolsMultiThreading
          主站蜘蛛池模板: 保康县| 开鲁县| 米泉市| 三门峡市| 竹山县| 奈曼旗| 精河县| 武义县| 红原县| 虎林市| 邵阳市| 南雄市| 子洲县| 明星| 巴彦淖尔市| 兰溪市| 宁明县| 札达县| 武宣县| 广水市| 彭州市| 定陶县| 上犹县| 东乌珠穆沁旗| 老河口市| 清远市| 中阳县| 富宁县| 鄂州市| 辛集市| 巧家县| 万安县| 东乡族自治县| 从江县| 星子县| 正安县| 榆社县| 衡阳县| 三亚市| 洛南县| 抚宁县|