xylz,imxylz

          關注后端架構、中間件、分布式和并發編程

             :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            111 隨筆 :: 10 文章 :: 2680 評論 :: 0 Trackbacks

          可以在對中對元素進行配對和交換的線程的同步點。每個線程將條目上的某個方法呈現給 exchange 方法,與伙伴線程進行匹配,并且在返回時接收其伙伴的對象。Exchanger 可能被視為 SynchronousQueue 的雙向形式。

          換句話說Exchanger提供的是一個交換服務,允許原子性的交換兩個(多個)對象,但同時只有一對才會成功。先看一個簡單的實例模型。

           

          Exchanger

          在上面的模型中,我們假定一個空的棧(Stack),棧頂(Top)當然是沒有元素的。同時我們假定一個數據結構Node,包含一個要交換的元素E和一個要填充的“洞”Node。這時線程T1攜帶節點node1進入棧(cas_push),當然這是CAS操作,這樣棧頂就不為空了。線程T2攜帶節點node2進入棧,發現棧里面已經有元素了node1,同時發現node1的hold(Node)為空,于是將自己(node2)填充到node1的hold中(cas_fill)。然后將元素node1從棧中彈出(cas_take)。這樣線程T1就得到了node1.hold.item也就是node2的元素e2,線程T2就得到了node1.item也就是e1,從而達到了交換的目的。

          算法描述就是下圖展示的內容。

          image

          JDK 5就是采用類似的思想實現的Exchanger。JDK 6以后為了支持多線程多對象同時Exchanger了就進行了改造(為了支持更好的并發),采用ConcurrentHashMap的思想,將Stack分割成很多的片段(或者說插槽Slot),線程Id(Thread.getId())hash相同的落在同一個Slot上,這樣在默認32個Slot上就有很好的吞吐量。當然會根據機器CPU內核的數量有一定的優化,有興趣的可以去了解下Exchanger的源碼。

          至于Exchanger的使用,在JDK文檔上有個例子,講述的是兩個線程交換數據緩沖區的例子(實際上仍然可以認為是生產者/消費者模型)。

          class FillAndEmpty {
             Exchanger
          <DataBuffer> exchanger = new Exchanger<DataBuffer>();
             DataBuffer initialEmptyBuffer
          =  a made-up type
             DataBuffer initialFullBuffer
          = 

            
          class FillingLoop implements Runnable {
              
          public void run() {
                 DataBuffer currentBuffer
          = initialEmptyBuffer;
                
          try {
                  
          while (currentBuffer != null) {
                     addToBuffer(currentBuffer);
                    
          if (currentBuffer.isFull())
                       currentBuffer
          = exchanger.exchange(currentBuffer);
                   }
                 }
          catch (InterruptedException ex) { handle }
               }
             }

            
          class EmptyingLoop implements Runnable {
              
          public void run() {
                 DataBuffer currentBuffer
          = initialFullBuffer;
                
          try {
                  
          while (currentBuffer != null) {
                     takeFromBuffer(currentBuffer);
                    
          if (currentBuffer.isEmpty())
                       currentBuffer
          = exchanger.exchange(currentBuffer);
                   }
                 }
          catch (InterruptedException ex) { handle }
               }
             }

            
          void start() {
              
          new Thread(new FillingLoop()).start();
              
          new Thread(new EmptyingLoop()).start();
             }
            }

           

          Exchanger實現的是一種數據分片的思想,這在大數據情況下將數據分成一定的片段并且多線程執行的情況下有一定的使用價值。

          最近一直推托工作忙,更新頻度越來越低了,好在現在的工作還有點個人時間,以后爭取多更新下吧,至少也要把這個專輯寫完。

           



          ©2009-2014 IMXYLZ |求賢若渴
          posted on 2010-11-22 22:31 imxylz 閱讀(7753) 評論(0)  編輯  收藏 所屬分類: Java Concurrency

          ©2009-2014 IMXYLZ
          主站蜘蛛池模板: 内丘县| 泊头市| 会昌县| 惠东县| 辽阳市| 叶城县| 云浮市| 广丰县| 韩城市| 深水埗区| 姚安县| 碌曲县| 保康县| 江陵县| 西昌市| 天镇县| 正镶白旗| 德保县| 崇信县| 凌云县| 象州县| 兴山县| 将乐县| 缙云县| 湘潭市| 岳阳市| 翼城县| 塔河县| 永靖县| 嘉鱼县| 瑞安市| 清镇市| 贵州省| 长白| 滨州市| 庄浪县| 密山市| 团风县| 社会| 枣阳市| 凤凰县|