java并發(fā)學習之一:CountDownLatch

          Posted on 2011-03-08 17:27 snake 閱讀(224) 評論(0)  編輯  收藏
          看了幾個月的《Java Concurrency in Practice》到了今天終于算可以收尾了,之前留下的看不懂的代碼,現(xiàn)在也基本明晰了一些

          全書介紹了很多細節(jié)問題,很多注意的點,很多原則性問題,個人感覺,無論看幾遍,都是值得的。但很多都是一些需要去記憶的東西,這個是需要經(jīng)驗的積累的。
          真正想在思考上,在設(shè)計上得到更大的提高,看來必然是要落在了concurrent包的數(shù)個同步器的實現(xiàn)的分析和對AQS的理解上了

          同步器是針對一些通用的場景設(shè)計的,由Doug Lea實現(xiàn)的,換句話來說,其實從同步器可以看出常用的一些需求,既然有了需求,又學會了工具(AQS框架)的使用,就可以按照自己的思維,試著重新去實現(xiàn)一下了


          CountDownLatch

          CountDownLatch是針對這樣一個需求設(shè)計的:n個參與者需要進行一件事情(比如開會),只有當他們都到了,才能正式開始,所以先到的需要等,并且同時開始

          首先,先用點“拙劣”的工具(1.5之前的實現(xiàn)方法)試著寫一下實現(xiàn)
          public class TestOfCountDownLatches {
          	
          	int count;
          	
          	Object lock = new Object();
          	
          	public TestOfCountDownLatches(int count)
          	{
          		this.count = count;
          	}
          	
          	public void countDown()
          	{
          		synchronized(lock)
          		{
          			count--;
          			if(count == 0)
          			{
          				lock.notifyAll();
          			}
          		}
          	}
          
                  public void await() throws InterruptedException
          	{
          		synchronized(lock)
          		{
          //中間還出現(xiàn)了點小問題:沒寫if(count == 0),這就導致了如果主線程先執(zhí)行了countdown,其他線程再執(zhí)行await,就會無限制的等待著了,從這個小錯誤也學會了并發(fā)框架的一個原則:阻塞還是不阻塞,應(yīng)該是嚴格依賴于狀態(tài)(state)的,而與先后次序沒關(guān)系
          			if(count != 0)
          				lock.wait();
          		}
          	}
          }
          


          測試方法:
          public long timeTasks1(int nThreads,final Runnable task)
          		{
          			final TestOfCountDownLatches endLatch = new TestOfCountDownLatches(nThreads);
          			final TestOfCountDownLatches startLatch = new TestOfCountDownLatches(1);
          			for(int i = 0;i<nThreads;i++)
          			{
          				Thread t = new Thread(){
          					public void run() {
          	                    try {
          	                    	startLatch.await();
          	                        try {
          	                            task.run();
          	                        } finally {
          	                        	endLatch.countDown();
          	                        }
          	                    } catch (InterruptedException ignored) { }
          	                }
          				};
          				t.start();
          			}
          			long start = System.nanoTime();			
          			try {
          				startLatch.countDown();
          				endLatch.await();
          			} catch (InterruptedException e) {
          				
          			}
          			long end = System.nanoTime();
          			return end-start;
          		}


          跑了一下,沒啥問題,與CountDownLatches的功能基本一致的

          再看看CountDownLatches的實現(xiàn)(主要是看它的同步器)
          private static final class Sync extends AbstractQueuedSynchronizer {
                  private static final long serialVersionUID = 4982264981922014374L;
          
                  Sync(int count) {
                      setState(count);
                  }
          
                  int getCount() {
                      return getState();
                  }
          //被await()調(diào)用
                  public int tryAcquireShared(int acquires) {
                      return getState() == 0? 1 : -1;
                  }
          //被countDown()調(diào)用
                  public boolean tryReleaseShared(int releases) {
                      // Decrement count; signal when transition to zero
                      for (;;) {
                          int c = getState();
                          if (c == 0)
                              return false;
                          int nextc = c-1;
                          if (compareAndSetState(c, nextc))
                              return nextc == 0;
                      }
                  }
              }
          


          在實現(xiàn)原理上是一致的,await時比較一下state(相當于第一個版本的count),如果滿足了,則不阻塞(返回1),如果不滿足,阻塞(返回-1)(關(guān)于阻塞與否的代碼都在AQS里面)
          在countDown時則進行一下--操作,如果滿足0“return nextc == 0;”了,則釋放阻塞

          既然主要目的是學習AQS,就debug了一下,盡量去了解AQS的實現(xiàn)原理
          主要了解到以下幾個方面:(下面的描述只針對AQS的shared模式)
          1.AQS沒用正常的wait,notifyAll,lock的阻塞等方法,而是用了一個LockSupport對象來支持類似操作,該對象的方法全是本地方法,相當于對synchronized和wait等操作用了一個統(tǒng)一的形式
          2.AQS中維持了一個等待隊列,很奇特的是,該等待隊列“完全沒用鎖”,比如說一個acquice操作,當沒成功的時候,是需要將該線程進入等待隊列的,多線程的訪問,所以這個入隊的過程應(yīng)該要保證它的原子性
          但AQS沒有這樣做,它只用了一些非常高效的改變int型的原子方法(由Unsafe提供),針對每一個需要保證原子性的操作(如入隊),它都用了一個while(true)這樣的形式,一旦失敗(比如發(fā)現(xiàn)隊尾已經(jīng)被別的線程改變了),則從新獲取當前狀態(tài),重新入隊,很神奇,很高效,同時也需要考慮得面面俱到的一個實現(xiàn),在文章中稱之為“非阻塞算法”,在后文會對該算法寫一篇詳細介紹
          雖然用1.5之前的wait等方法可以實現(xiàn),但無疑用AQS這套框架會更高效,響應(yīng)更快

          再貼一個使用CountDownLatches的例子做為結(jié)尾
          public long timeTasks2(int nThreads, final Runnable task)
          	            throws InterruptedException {
          	        final CountDownLatch startGate = new CountDownLatch(1);
          	        final CountDownLatch endGate = new CountDownLatch(nThreads);
          
          	        for (int i = 0; i < nThreads; i++) {
          	            Thread t = new Thread() {
          	                public void run() {
          	                    try {
          	                        startGate.await();
          	                        try {
          	                            task.run();
          	                        } finally {
          	                            endGate.countDown();
          	                        }
          	                    } catch (InterruptedException ignored) { }
          	                }
          	            };
          	            t.start();
          	        }
          
          	        long start = System.nanoTime();
          	        startGate.countDown();
          	        endGate.await();
          	        long end = System.nanoTime();
          	        return end-start;
          	    }
          


          已有 0 人發(fā)表留言,猛擊->>這里<<-參與討論


          ItEye推薦




          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導航:
           
          主站蜘蛛池模板: 富川| 封丘县| 邵武市| 海丰县| 丘北县| 博野县| 鲁甸县| 定州市| 昆山市| 清远市| 惠州市| 成都市| 门头沟区| 开原市| 鲁山县| 象州县| 惠州市| 遂宁市| 布拖县| 莱州市| 涡阳县| 鄂伦春自治旗| 旬阳县| 从江县| 鲁甸县| 双城市| 五峰| 衡阳县| 屏东县| 彭州市| 阜新| 仁化县| 琼中| 朔州市| 内江市| 尼木县| 扶余县| 福安市| 昌邑市| 寻乌县| 得荣县|