隨筆-7  評論-23  文章-0  trackbacks-0
          JAVA LOCK總體來說關鍵要素主要包括3:
          1.unsafe.compareAndSwapXXX(Object o,long offset,int expected,int x)
          2.unsafe.park() unsafe.unpark()
          3.單向鏈表結構或者說存儲線程的數據結構

          1主要為了保證鎖的原子性,相當于一個鎖是否正在被使用的標記,并且比較和設置這個標記的操作是原子的(硬件提供的swap和test_and_set指令,單CPU下同一指令的多個指令周期不可中斷,SMP中通過鎖總線支持上訴兩個指令的原子性),這基本等于軟件級別所能達到的最高級別隔離。

          2主要將未得到鎖的線程禁用(park)和喚醒(unpark),也是直接native實現(這幾個native方法的實現代碼在hotspot\src\share\vm\prims\unsafe.cpp文件中,但是關鍵代碼park的最終實現是和操作系統相關的,比如windows下實現是在os_windows.cpp中,有興趣的同學可以下載jdk源碼查看)。喚醒一個被park()線程主要手段包括以下幾種
          1.       其他線程調用以被park()線程為參數的unpark(Thread thread).
          2.       其他線程中斷被park()線程,waiters.peek().interrupt();waiters為存儲線程對象的隊列.
          3.       不知原因的返回。

          park()方法返回并不會報告到底是上訴哪種返回,所以返回好最好檢查下線程狀態,如

          LockSupport.park();  //禁用當前線程
          If(Thread.interrupted){
             
          //doSomething
          }

          AbstractQueuedSynchronizer(AQS)對于這點實現得相當巧妙,如下所示
          private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
              
          final Node node = addWaiter(Node.SHARED);
              
          try {
                   
          for (;;) {
                       
          final Node p = node.predecessor();
                       
          if (p == head) {
                           
          int r = tryAcquireShared(arg);
                           
          if (r >= 0{
                               setHeadAndPropagate(node, r);
                               p.next 
          = null// help GC
                               return;
                           }

                       }

                       
          //parkAndCheckInterrupt()會返回park住的線程在被unpark后的線程狀態,如果線程中斷,跳出循環。
                       if (shouldParkAfterFailedAcquire(p, node) &&
                           parkAndCheckInterrupt())
                           
          break;
                   }

               }
           catch (RuntimeException ex) {
                    cancelAcquire(node);
                    
          throw ex;
               }

              
               
          // 只有線程被interrupt后才會走到這里
               cancelAcquire(node);
               
          throw new InterruptedException();
          }


          //在park()住的線程被unpark()后,第一時間返回當前線程是否被打斷
          private final boolean parkAndCheckInterrupt() {
              LockSupport.park(
          this);
              
          return Thread.interrupted();
          }
          3點對于一個Synchronizer的實現非常重要,存儲等待線程,并且unlock時喚醒等待線程,這中間有很多工作需要做,喚醒策略,等待線程意外終結處理,公平非公平,可重入不可重入等。


          以上簡單說明了下
          JAVA LOCKS關鍵要素,現在我們來看下java.util.concurrent.locks大致結構

          上圖中,LOCK的實現類其實都是構建在AbstractQueuedSynchronizer上,為何圖中沒有用UML線表示呢,這是每個Lock實現類都持有自己內部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。為何要實現不同的Sync呢?這和每種Lock用途相關。另外還有AQSState機制。

          基于AQS構建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,這些Synchronizer實際上最基本的東西就是原子狀態的獲取和釋放,只是條件不一樣而已。

          ReentrantLock需要記錄當前線程獲取原子狀態的次數,如果次數為零,那么就說明這個線程放棄了鎖(也有可能其他線程占據著鎖從而需要等待),如果次數大于1,也就是獲得了重進入的效果,而其他線程只能被park住,直到這個線程重進入鎖次數變成0而釋放原子狀態。以下為ReetranLock的FairSync的tryAcquire實現代碼解析。

          //公平獲取鎖
          protected final boolean tryAcquire(int acquires) {
              
          final Thread current = Thread.currentThread();
              
          int c = getState();
              
          //如果當前重進入數為0,說明有機會取得鎖
              if (c == 0{
                  
          //如果是第一個等待者,并且設置重進入數成功,那么當前線程獲得鎖
                  if (isFirst(current) &&
                      compareAndSetState(
          0, acquires)) {
                      setExclusiveOwnerThread(current);
                      
          return true;
                  }

              }

              
          //如果當前線程本身就持有鎖,那么疊加重進入數,并且繼續獲得鎖
              else if (current == getExclusiveOwnerThread()) {
                  
          int nextc = c + acquires;
                  
          if (nextc < 0)
                      
          throw new Error("Maximum lock count exceeded");
                  setState(nextc);
                  
          return true;
               }

               
          //以上條件都不滿足,那么線程進入等待隊列。
               return false;
          }

          Semaphore則是要記錄當前還有多少次許可可以使用,到0,就需要等待,也就實現并發量的控制,Semaphore一開始設置許可數為1,實際上就是一把互斥鎖。以下為Semaphore的FairSync實現

          protected int tryAcquireShared(int acquires) {
              Thread current 
          = Thread.currentThread();
              
          for (;;) {
                   Thread first 
          = getFirstQueuedThread();
                   
          //如果當前等待隊列的第一個線程不是當前線程,那么就返回-1表示當前線程需要等待
                   if (first != null && first != current)
                        
          return -1;
                   
          //如果當前隊列沒有等待者,或者當前線程就是等待隊列第一個等待者,那么先取得semaphore還有幾個許可證,并且減去當前線程需要的許可證得到剩下的值
                   int available = getState();
                   
          int remaining = available - acquires;
                   
          //如果remining<0,那么反饋給AQS當前線程需要等待,如果remaining>0,并且設置availble成功設置成剩余數,那么返回剩余值(>0),也就告知AQS當前線程拿到許可,可以繼續執行。
                   if (remaining < 0 ||compareAndSetState(available, remaining))
                       
          return remaining;
              }

          }

          CountDownLatch閉鎖則要保持其狀態,在這個狀態到達終止態之前,所有線程都會被park住,閉鎖可以設定初始值,這個值的含義就是這個閉鎖需要被countDown()幾次,因為每次CountDownsync.releaseShared(1),而一開始初始值為10的話,那么這個閉鎖需要被countDown()十次,才能夠將這個初始值減到0,從而釋放原子狀態,讓等待的所有線程通過。

          //await時候執行,只查看當前需要countDown數量減為0了,如果為0,說明可以繼續執行,否則需要park住,等待countDown次數足夠,并且unpark所有等待線程
          public int tryAcquireShared(int acquires) {
               
          return getState() == 0? 1 : -1;
          }


          //countDown時候執行,如果當前countDown數量為0,說明沒有線程await,直接返回false而不需要喚醒park住線程,如果不為0,得到剩下需要countDown的數量并且compareAndSet,最終返回剩下的countDown數量是否為0,供AQS判定是否釋放所有await線程。
          public boolean tryReleaseShared(int releases) {
              
          for (;;) {
                   
          int c = getState();
                   
          if (c == 0)
                       
          return false;
                   
          int nextc = c-1;
                   
          if (compareAndSetState(c, nextc))
                       
          return nextc == 0;
             }

          }

          FutureTask需要記錄任務的執行狀態,當調用其實例的get方法時,內部類Sync會去調用AQSacquireSharedInterruptibly()方法,而這個方法會反向調用Sync實現的tryAcquireShared()方法,即讓具體實現類決定是否讓當前線程繼續還是park,FutureTasktryAcquireShared方法所做的唯一事情就是檢查狀態,如果是RUNNING狀態那么讓當前線程park。而跑任務的線程會在任務結束時調用FutureTask 實例的set方法(與等待線程持相同的實例),設定執行結果,并且通過unpark喚醒正在等待的線程,返回結果。

          //get時待用,只檢查當前任務是否完成或者被Cancel,如果未完成并且沒有被cancel,那么告訴AQS當前線程需要進入等待隊列并且park住
          protected int tryAcquireShared(int ignore) {
               
          return innerIsDone()? 1 : -1;
          }


          //判定任務是否完成或者被Cancel
          boolean innerIsDone() {
              
          return ranOrCancelled(getState()) &&    runner == null;
          }


          //get時調用,對于CANCEL與其他異常進行拋錯
          V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
              
          if (!tryAcquireSharedNanos(0,nanosTimeout))
                  
          throw new TimeoutException();
              
          if (getState() == CANCELLED)
                  
          throw new CancellationException();
              
          if (exception != null)
                  
          throw new ExecutionException(exception);
              
          return result;
          }


          //任務的執行線程執行完畢調用(set(V v))
          void innerSet(V v) {
               
          for (;;) {
                  
          int s = getState();
                  
          //如果線程任務已經執行完畢,那么直接返回(多線程執行任務?)
                  if (s == RAN)
                      
          return;
                  
          //如果被CANCEL了,那么釋放等待線程,并且會拋錯
                  if (s == CANCELLED) {
                      releaseShared(
          0);
                      
          return;
                  }

                  
          //如果成功設定任務狀態為已完成,那么設定結果,unpark等待線程(調用get()方法而阻塞的線程),以及后續清理工作(一般由FutrueTask的子類實現)
                  if (compareAndSetState(s, RAN)) {
                      result 
          = v;
                      releaseShared(
          0);
                      done();
                      
          return;
                  }

               }

          }

          以上4AQS的使用是比較典型,然而有個問題就是這些狀態存在哪里呢?并且是可以計數的。從以上4個example,我們可以很快得到答案,AQS提供給了子類一個int state屬性。并且暴露給子類getState()setState()兩個方法(protected)。這樣就為上述狀態解決了存儲問題,RetrantLock可以將這個state用于存儲當前線程的重進入次數,Semaphore可以用這個state存儲許可數,CountDownLatch則可以存儲需要被countDown的次數,而Future則可以存儲當前任務的執行狀態(RUNING,RAN,CANCELL)。其他的Synchronizer存儲他們的一些狀態。

          AQS留給實現者的方法主要有5個方法,其中tryAcquire,tryReleaseisHeldExclusively三個方法為需要獨占形式獲取的synchronizer實現的,比如線程獨占ReetranLockSync,而tryAcquireSharedtryReleasedShared為需要共享形式獲取的synchronizer實現。

          ReentrantLock內部Sync類實現的是tryAcquire,tryRelease, isHeldExclusively三個方法(因為獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內部類FairSyncNonfairSync實現)Semaphore內部類Sync則實現了tryAcquireSharedtryReleasedShared(CountDownLatch相似,因為公平性問題,tryAcquireShared由其內部類FairSyncNonfairSync實現)CountDownLatch內部類Sync實現了tryAcquireSharedtryReleasedSharedFutureTask內部類Sync也實現了tryAcquireSharedtryReleasedShared

          其實使用過一些JAVA synchronizer的之后,然后結合代碼,能夠很快理解其到底是如何做到各自的特性的,在把握了基本特性,即獲取原子狀態和釋放原子狀態,其實我們自己也可以構造synchronizer。如下是一個LOCK API的一個例子,實現了一個先入先出的互斥鎖。

          public class FIFOMutex {
              
          private AtomicBoolean locked=new AtomicBoolean(false);
              
          private Queue<Thread> waiters=new ConcurrentLinkedQueue<Thread>();
              
              
          public void lock(){
                  
          boolean wasInterrupted=false;
                  Thread current
          =Thread.currentThread();
                  waiters.add(current);
                  
                  
          //如果waiters的第一個等待者不為當前線程,或者當前locked的狀態為被占用(true)
                  
          //那么park住當前線程
                  while(waiters.peek()!=current||!locked.compareAndSet(falsetrue)){
                      LockSupport.park();
                      
                      
          //當線程被unpark時,第一時間檢查當前線程是否被interrupted
                      if(Thread.interrupted()){
                          wasInterrupted
          =true;
                      }

                  }

                  
                  
          //得到鎖后,從等待隊列移除當前線程,如果,并且如果當前線程已經被interrupted,
                  
          //那么再interrupt一下以便供外部響應。
                  waiters.remove();
                  
          if(wasInterrupted){
                      current.interrupt();
                  }

              }

              
              
          //unlock邏輯相對簡單,設定當前鎖為空閑狀態,并且將等待隊列中
              
          //的第一個等待線程喚醒
              public void unlock(){
                  locked.set(
          false);
                  LockSupport.unpark(waiters.peek());
              }

          }

          總結,JAVA lock機制對于整個java concurrent包的成員意義重大,了解這個機制對于使用java并發類有著很多的幫助,文章中可能存在著各種錯誤,請各位多多諒解并且能夠提出來,謝謝。

          文章參考:JDK 1.6 source
                              java 并發編程實踐
                              JDK 1.6 API 文檔

           

          posted on 2010-09-30 12:05 BucketLI 閱讀(13158) 評論(2)  編輯  收藏

          評論:
          # re: JAVA LOCK代碼淺析 2012-10-30 10:32 | mengmeng.zhangmm
          非常清晰的講解了Concurrent的QAS機制和部分并發工具類實現。
          謝謝樓主的博文。  回復  更多評論
            
          # re: JAVA LOCK代碼淺析 2014-10-28 10:43 | zuidaima
          java 線程demo教程源代碼下載:http://zuidaima.com/share/k%E7%BA%BF%E7%A8%8B-p1-s1.htm  回復  更多評論
            

          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 乐亭县| 威海市| 独山县| 遂宁市| 云浮市| 库伦旗| 莱西市| 嘉定区| 赣州市| 泰安市| 石门县| 芦溪县| 灵丘县| 剑川县| 布尔津县| 腾冲县| 高邑县| 页游| 会宁县| 东乡族自治县| 新竹市| 高青县| 罗平县| 河北区| 广水市| 延安市| 彰武县| 新昌县| 平昌县| 华阴市| 图片| 同仁县| 灵石县| 昌图县| 基隆市| 西贡区| 化州市| 宁阳县| 新绛县| 澄城县| 普陀区|