posts - 241,  comments - 116,  trackbacks - 0
          今天就聊聊這兩種Queue,本文分為以下兩個部分,用分割線分開:
          • BlockingQueue
          • ConcurrentLinkedQueue,非阻塞算法


          首先來看看BlockingQueue
          Queue是什么就不需要多說了吧,一句話:隊列是先進先出。相對的,棧是后進先出。如果不熟悉的話先找本基礎的數據結構的書看看吧。

          BlockingQueue,顧名思義,“阻塞隊列”:可以提供阻塞功能的隊列。
          首先,看看BlockingQueue提供的常用方法:
                  Throws exceptionSpecial valueBlocks       Times out
          Insert  add(e)          offer(e)     put(e) offer(e, timeout, unit)
          Remove  remove()       poll()       take() poll(timeout, unit)
          Examine element()       peek()       not applicable not applicable

          從上表可以很明顯看出每個方法的作用,這個不用多說。我想說的是:
          • add(e) remove() element() 方法不會阻塞線程。當不滿足約束條件時,會拋出IllegalStateException 異常。例如:當隊列被元素填滿后,再調用add(e),則會拋出異常。
          • offer(e) poll() peek() 方法即不會阻塞線程,也不會拋出異常。例如:當隊列被元素填滿后,再調用offer(e),則不會插入元素,函數返回false。
          • 要想要實現阻塞功能,需要調用put(e) take() 方法。當不滿足約束條件時,會阻塞線程。

          好,上點源碼你就更明白了。以ArrayBlockingQueue類為例:
          對于第一類方法,很明顯如果操作不成功就拋異常。而且可以看到其實調用的是第二類的方法,為什么?因為第二類方法返回boolean啊。
          public boolean add(E e) {
               if (offer(e))
                   return true;
               else
                   throw new IllegalStateException("Queue full");//隊列已滿,拋異常
          }

          public E remove() {
              E x = poll();
              if (x != null)
                  return x;
              else
                  throw new NoSuchElementException();//隊列為空,拋異常
          }
          注:先不看阻塞與否,這ReentrantLock的使用方式就能說明這個類是線程安全類
          public boolean offer(E e) {
                  if (e == null) throw new NullPointerException();
                  final ReentrantLock lock = this.lock;
                  lock.lock();
                  try {
                      if (count == items.length)//隊列已滿,返回false
                          return false;
                      else {
                          insert(e);//insert方法中發出了notEmpty.signal();
                          return true;
                      }
                  } finally {
                      lock.unlock();
                  }
              }

          public E poll() {
                  final ReentrantLock lock = this.lock;
                  lock.lock();
                  try {
                      if (count == 0)//隊列為空,返回false
                          return null;
                      E x = extract();//extract方法中發出了notFull.signal();
                      return x;
                  } finally {
                      lock.unlock();
                  }
              }
          對于第三類方法,這里面涉及到Condition類,簡要提一下,
          await方法指:造成當前線程在接到信號或被中斷之前一直處于等待狀態。
          signal方法指:喚醒一個等待線程。
          public void put(E e) throws InterruptedException {
                  if (e == null) throw new NullPointerException();
                  final E[] items = this.items;
                  final ReentrantLock lock = this.lock;
                  lock.lockInterruptibly();
                  try {flex對應java數據類型
                      try {
                          while (count == items.length)//如果隊列已滿,等待notFull這個條件,這時當前線程被阻塞
                              notFull.await();
                      } catch (InterruptedException ie) {
                          notFull.signal(); //喚醒受notFull阻塞的當前線程
                          throw ie;
                      }
                      insert(e);
                  } finally {
                      lock.unlock();
                  }
              }

          public E take() throws InterruptedException {
                  final ReentrantLock lock = this.lock;
                  lock.lockInterruptibly();
                  try {
                      try {
                          while (count == 0)//如果隊列為空,等待notEmpty這個條件,這時當前線程被阻塞
                              notEmpty.await();
                      } catch (InterruptedException ie) {
                          notEmpty.signal();//喚醒受notEmpty阻塞的當前線程
                          throw ie;
                      }
                      E x = extract();
                      return x;
                  } finally {
                      lock.unlock();
                  }
              }
          第四類方法就是指在有必要時等待指定時間,就不詳細說了。

          再來看看BlockingQueue接口的具體實現類吧:
          • ArrayBlockingQueue,其構造函數必須帶一個int參數來指明其大小
          • LinkedBlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定
          • PriorityBlockingQueue,其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序
          上面是用ArrayBlockingQueue舉得例子,下面看看LinkedBlockingQueue
          首先,既然是鏈表,就應該有Node節點,它是一個內部靜態類:
          static class Node<E> {  
                  /** The item, volatile to ensure barrier separating write and read */  
                  volatile E item;  
                  Node<E> next;  
                  Node(E x) { item = x; }  
              }  
          然后,對于鏈表來說,肯定需要兩個變量來標示頭和尾:
              /** 頭指針 */  
              private transient Node<E> head;  //head.next是隊列的頭元素
              /** 尾指針 */  
              private transient Node<E> last;  //last.next是null
          么,對于入隊和出隊就很自然能理解了:
              private void enqueue(E x) {  
                  last = last.next = new Node<E>(x);  //入隊是為last再找個下家
              }  
           
              private E dequeue() {  
                  Node<E> first = head.next;  //出隊是把head.next取出來,然后將head向后移一位
                  head = first;  
                  E x = first.item;  
                  first.item = null;  
                  return x;  
              }  
          另外,LinkedBlockingQueue相對于ArrayBlockingQueue還有不同是,有兩個ReentrantLock,且隊列現有元素的大小由一個AtomicInteger對象標示。
          注:AtomicInteger類是以原子的方式操作整型變量。
              private final AtomicInteger count = new AtomicInteger(0);
              /** 用于讀取的獨占鎖*/  
              private final ReentrantLock takeLock = new ReentrantLock();  
              /** 隊列是否為空的條件 */  
              private final Condition notEmpty = takeLock.newCondition();  
              /** 用于寫入的獨占鎖 */  
              private final ReentrantLock putLock = new ReentrantLock();  
              /** 隊列是否已滿的條件 */  
              private final Condition notFull = putLock.newCondition();
          有兩個Condition很好理解,在ArrayBlockingQueue也是這樣做的。但是為什么需要兩個ReentrantLock呢?下面會慢慢道來。
          讓我們來看看offer和poll方法的代碼:
              public boolean offer(E e) {
                  if (e == null) throw new NullPointerException();
                  final AtomicInteger count = this.count;
                  if (count.get() == capacity)
                      return false;
                  int c = -1;
                  final ReentrantLock putLock = this.putLock;//入隊當然用putLock
                  putLock.lock();
                  try {
                      if (count.get() < capacity) {
                          enqueue(e); //入隊
                          c = count.getAndIncrement(); //隊長度+1
                          if (c + 1 < capacity)
                              notFull.signal(); //隊列沒滿,當然可以解鎖了
                      }
                  } finally {
                      putLock.unlock();
                  }
                  if (c == 0)
                      signalNotEmpty();//這個方法里發出了notEmpty.signal();
                  return c >= 0;
              }

             public E poll() {
                  final AtomicInteger count = this.count;
                  if (count.get() == 0)
                      return null;
                  E x = null;
                  int c = -1;
                  final ReentrantLock takeLock = this.takeLock;出隊當然用takeLock
                  takeLock.lock();
                  try {
                      if (count.get() > 0) {
                          x = dequeue();//出隊
                          c = count.getAndDecrement();//隊長度-1
                          if (c > 1)
                              notEmpty.signal();//隊列沒空,解鎖
                      }
                  } finally {
                      takeLock.unlock();
                  }
                  if (c == capacity)
                      signalNotFull();//這個方法里發出了notFull.signal();
                  return x;
              }
          看看源代碼發現和上面ArrayBlockingQueue的很類似,關鍵的問題在于:為什么要用兩個ReentrantLockputLock和takeLock?
          我們仔細想一下,入隊操作其實操作的只有隊尾引用last,并且沒有牽涉到head。而出隊操作其實只針對head,和last沒有關系。那么就 是說入隊和出隊的操作完全不需要公用一把鎖,所以就設計了兩個鎖,這樣就實現了多個不同任務的線程入隊的同時可以進行出隊的操作,另一方面由于兩個操作所 共同使用的count是AtomicInteger類型的,所以完全不用考慮計數器遞增遞減的問題。
          另外,還有一點需要說明一下:await()和singal()這兩個方法執行時都會檢查當前線程是否是獨占鎖的當前線程,如果不是則拋出 java.lang.IllegalMonitorStateException異常。所以可以看到在源碼中這兩個方法都出現在Lock的保護塊中。
          posted on 2011-07-18 09:51 墻頭草 閱讀(4360) 評論(0)  編輯  收藏

          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          人人游戲網 軟件開發網 貨運專家
          主站蜘蛛池模板: 宽甸| 鲁山县| 安吉县| 东兰县| 沾益县| 阳山县| 体育| 汽车| 永吉县| 海阳市| 莱芜市| 南城县| 巴南区| 泸溪县| 新蔡县| 舟山市| 满城县| 建昌县| 石泉县| 营山县| 平阳县| 吉木乃县| 拉萨市| 治多县| 泌阳县| 三亚市| 大城县| 宜阳县| 镇原县| 花莲市| 桦川县| 上蔡县| 白朗县| 合阳县| 清原| 桑日县| 龙门县| 临沧市| 梁平县| 临沂市| 尚义县|