上善若水
          In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatra
          posts - 146,comments - 147,trackbacks - 0

          當(dāng)前JDK對并發(fā)編程的支持

          Sun在Java5中引入了concurrent包,它對Java的并發(fā)編程提供了強(qiáng)大的支持。首先,它提供了Lock接口,可用了更細(xì)粒度的控制鎖的區(qū)域,它的實現(xiàn)類有ReentrantLock,ReadLock,WriteLock,其中ReadLock和WriteLock共同用于實現(xiàn)ReetrantReadWriteLock(它繼承自ReadWriteLock,但是沒有實現(xiàn)Lock接口,ReadWriteLock接口也沒有繼承Lock接口)。而且,它還提供了一些常用并發(fā)場景下的類工具:Semaphore、CountDownLatch和CyclicBarrier。它們個字的應(yīng)用場景:
          1. Semaphore(信號量)
            有n個非線程安全的資源(資源池),這些資源使用一個Semaphore(計數(shù)信號量)保護(hù),每個線程在使用這些資源時需要首先獲得一個信號量(acquire)表示當(dāng)前資源池還有可用資源,然后線程從該資源池中獲取并移除一個資源,在使用完后,將該資源交回給資源池,并釋放已經(jīng)獲得信號量(release)(這里的“移除”、“交回”并不一定需要顯示操作,只是一種形象的描述,之所以這么描述是應(yīng)為這里的各個資源是一樣的,因而對一個線程它每次拿到的資源不一定是同一個資源,用于區(qū)分Stripe的使用場景),其中Pool是一種典型的應(yīng)用。
          2. CountDownLatch(閉鎖)
            有n個Task,它們執(zhí)行完成后需要執(zhí)行另外一個收尾的Task(Aggregated Task),比如在做Report計算中,有n個Report要計算,而在所有Report計算完成后需要生成一個基于所有Report結(jié)果的一個總的Report,而這個總的Report需要等到所有Report計算出結(jié)果后才能開始,此時就可以定義一個CountDownLatch,其初始值是n,在總的Report計算前調(diào)用CountDownLatch的await方法等待其他Report執(zhí)行完成,而其他Report在完成后都會調(diào)用CountDownLatch中的countDown方法。
          3. CyclicBarrier(關(guān)卡)
            每個線程執(zhí)行完成后需要等待,直到n個線程都執(zhí)行完成后,才能繼續(xù)執(zhí)行,在n個線程執(zhí)行完成之后,而下一次執(zhí)行開始之前可以添加自定義邏輯(通過構(gòu)建CyclicBarrier實例時傳入一個Runnable實例自定義邏輯),即在每個線程執(zhí)行完成后調(diào)用CyclicBarrier的await方法并等待(即所謂的關(guān)卡),當(dāng)n個線程都完成后,自定義的Runnable實例會自動被執(zhí)行(如果存在這樣的Runnable實例的話),然后所有線程繼續(xù)下一次執(zhí)行。這個現(xiàn)實中的例子沒有想到比較合適的。。。。
          4. Exchanger(交換者)
            Exchanger是一種特殊的CyclicBarrier,它只有兩個線程參與,一個生產(chǎn)者,一個消費(fèi)者,有兩個隊列共同參與,生產(chǎn)者和消費(fèi)者各自有一個隊列,其中生產(chǎn)者向它的隊列添加數(shù)據(jù),而消費(fèi)者從它包含的隊列中拿數(shù)據(jù),當(dāng)生產(chǎn)者中的隊列滿時調(diào)用exchange方法,傳入自己原有的隊列,期待交換得到消費(fèi)者中空的隊列;而當(dāng)消費(fèi)者中的隊列滿時同樣調(diào)用exchange方法,傳入自己的原有隊列,期待獲取到生產(chǎn)者中已經(jīng)填滿的隊列。這樣,生產(chǎn)者和消費(fèi)者可以和諧的生產(chǎn)消費(fèi),并且它們的步驟是一致的(不管哪一方比另一方快都會等待另一方)。
          最后,Java5中還提供了一些atomic類以實現(xiàn)簡單場景下高效非lock方式的線程安全,以及BlockingQueue、Synchronizer、CompletionService、ConcurrentHashMap等工具類。

          在這里需要特別添加對ConcurrentHashMap的描述,因為Guava中的Stripe就是對ConcurrentHashMap實現(xiàn)思想的抽象。在《Java Core系列之ConcurrentHashMap實現(xiàn)(JDK 1.7)》一文中已經(jīng)詳細(xì)講述了ConcurrentHashMap的實現(xiàn),我們都知道ConcurrentHashMap的實現(xiàn)是基于Segment的,它內(nèi)部包含了多個Segment,因而它內(nèi)部的鎖是基于Segment而不是整個Map,從而減小了鎖的粒度,提升了性能。而這種分段鎖不僅僅在HashMap用到。

          Stripe的應(yīng)用場景

          雖然JDK中已經(jīng)為我們提供了很多用于并發(fā)編程的工具類,但是它并沒有提供對以下應(yīng)用場景的支持:有n個資源,我們希望對每個資源的操作都是線程安全的,這里我們不能用Semaphore,因為Semaphore是一個池的概念,它所管理的資源是同質(zhì)的,比如從數(shù)據(jù)庫的連接池中獲取Connection操作的一種實現(xiàn)方式是內(nèi)部保存一個Semaphore變量,在每次獲取Connection時,先調(diào)用Semaphore的acquire方法以保證連接池中還有空閑的Connection,如果有,則可以隨機(jī)的選擇一個Connection實例,當(dāng)Connection實例返回時,該Connection實例必須從空閑列表中移除,從而保證只有一個線程獲取到Connection,以保證一次只有一個線程使用一個Connection(在Java中數(shù)據(jù)庫的Connection是線程安全,但是我們在使用時依然會用連接池的方式創(chuàng)建多個Connection而不是在一個應(yīng)用程序中只用一個Connection是因為有些數(shù)據(jù)庫廠商在實現(xiàn)Connection時,一個Connection內(nèi)的所有操作都時串行的,而不是并行的,比如MySQL的Connection實現(xiàn),因而為了提升并行性,采用多個Connection方式)。而這里的需求是對每個資源的操作都是線程安全的,比如對JDK中HashMap的實現(xiàn)采用一個數(shù)組鏈表的結(jié)構(gòu)(參考《Java Core系列之HashMap實現(xiàn)》),如果我們將鏈表作為一個資源單位(這里的鏈表資源和上述的數(shù)據(jù)庫連接資源是不一樣的,對數(shù)據(jù)庫連接每個線程只需要拿到任意一個Connection實例即可,而這里的鏈表資源則是不同鏈表是不一樣的,因而對每個操作,我們需要獲取特定的鏈表,然后對鏈表以線程安全的方式操作,因為這里多個線程會對同一個鏈表同時操作),那么為了保證對各個單獨(dú)鏈表操作的線程安全(如HashMap的put操作,不考慮rehash的情況,有些其他操作需要更大粒度的線程安全,比如contains等),其中一種簡單的實現(xiàn)方式是為每條鏈表關(guān)聯(lián)一個鎖,對每條鏈表的讀寫操作使用其關(guān)聯(lián)鎖即可。然而如果鏈表很多,就需要使用很多鎖,會消耗很多資源,雖然它的鎖粒度最小,并發(fā)性很高。然而如果各個鏈表之間沒有很高的并發(fā)性,我們就可以讓多個鏈表共享一個鎖以減少鎖的使用量,雖然增大了鎖的粒度,但是如果這些鏈表的并發(fā)程度并不是很高,那增大的鎖的粒度對并發(fā)性并沒有很大的影響。

          在實際應(yīng)用中,我們有一個Cache系統(tǒng),它包含key和payload的鍵值對(Map),在Cache中Map的實現(xiàn)已經(jīng)是線程安全了,然而我們不僅僅是向Cache中寫數(shù)據(jù)要保證線程安全,在操作payload時,也需要保證線程安全。因為我們在Cache中的數(shù)據(jù)量很大,為每個payload配置一個單獨(dú)的鎖顯然不現(xiàn)實,也不需要因為它們沒有那么高的并發(fā)行,因而我們需要一種機(jī)制將key分成不同的group,而每個group共享一個鎖(這就是ConcurrentHashMap的實現(xiàn)思路)。通過key即可獲得一個鎖,并且每個相同的key獲得的鎖實例是相同的(獲得相同鎖實例的key它們不一定相等,因為這是一對多的關(guān)系)。

          Stripe的簡單實現(xiàn)

          根據(jù)以上應(yīng)用場景,Stripe的實現(xiàn)很簡單,只需要內(nèi)部保存一個Lock數(shù)組,對每個給定的key,計算其hash值,根據(jù)hash值計算其鎖對應(yīng)的數(shù)組下標(biāo),而該下標(biāo)下的Lock實例既是和該key關(guān)聯(lián)的Lock實例。這里通過hash值把key和Lock實例關(guān)聯(lián)起來,為了擴(kuò)展性,在實現(xiàn)時還可以把計算數(shù)組下標(biāo)的邏輯抽象成一個接口,用戶可以通過傳入自定義該接口的實現(xiàn)類實例加入用戶自定義的關(guān)聯(lián)邏輯,默認(rèn)采用hash值關(guān)聯(lián)方式。

          Stripe在Guava中的實現(xiàn)

          在Guava中,Stripe以抽象類的形式存在,它定義了通過給定key或index獲得相應(yīng)Lock/Semaphore/ReadWriteLock實例:
          public abstract class Striped<L> {
            /**
             * Returns the stripe that corresponds to the passed key. It is always guaranteed that if
             * {
          @code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
             *
             * 
          @param key an arbitrary, non-null key
             * 
          @return the stripe that the passed key corresponds to
             
          */
            public abstract L get(Object key);

            /**
             * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to
             * {
          @code size()}, exclusively.
             *
             * 
          @param index the index of the stripe to return; must be in {@code [0size())}
             * 
          @return the stripe at the specified index
             
          */
            public abstract L getAt(int index);

            /**
             * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
             
          */
            abstract int indexFor(Object key);

            /**
             * Returns the total number of stripes in this instance.
             
          */
            public abstract int size();

            /**
             * Returns the stripes that correspond to the passed objects, in ascending (as per
             * {
          @link #getAt(int)}) order. Thus, threads that use the stripes in the order returned
             * by this method are guaranteed to not deadlock each other.
             *
             * <p>It should be noted that using a {
          @code Striped<L>} with relatively few stripes, and
             * {
          @code bulkGet(keys)} with a relative large number of keys can cause an excessive number
             * of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays
             * are needed for a pair of them to match). Please consider carefully the implications of the
             * number of stripes, the intended concurrency level, and the typical number of keys used in a
             * {
          @code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls
             * in Bins model</a> for mathematical formulas that can be used to estimate the probability of
             * collisions.
             *
             * 
          @param keys arbitrary non-null keys
             * 
          @return the stripes corresponding to the objects (one per each object, derived by delegating
             *         to {
          @link #get(Object)}; may contain duplicates), in an increasing index order.
             
          */
            public Iterable<L> bulkGet(Iterable<?> keys);
          }
          可以使用一下幾個靜態(tài)工廠方法創(chuàng)建相應(yīng)的Striped實例,其中l(wèi)azyWeakXXX創(chuàng)建的Striped實例中鎖以弱引用的方式存在(在什么樣的場景中使用呢?):
          /**
           * Creates a {
          @code Striped<Lock>} with eagerly initialized, strongly referenced locks.
           * Every lock is reentrant.
           *
           * 
          @param stripes the minimum number of stripes (locks) required
           * 
          @return a new {@code Striped<Lock>}
           
          */
          public static Striped<Lock> lock(int stripes);
          /**
           * Creates a {
          @code Striped<Lock>} with lazily initialized, weakly referenced locks.
           * Every lock is reentrant.
           *
           * 
          @param stripes the minimum number of stripes (locks) required
           * 
          @return a new {@code Striped<Lock>}
           
          */
          public static Striped<Lock> lazyWeakLock(int stripes);
          /**
           * Creates a {
          @code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
           * with the specified number of permits.
           *
           * 
          @param stripes the minimum number of stripes (semaphores) required
           * 
          @param permits the number of permits in each semaphore
           * 
          @return a new {@code Striped<Semaphore>}
           
          */
          public static Striped<Semaphore> semaphore(int stripes, final int permits);
          /**
           * Creates a {
          @code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
           * with the specified number of permits.
           *
           * 
          @param stripes the minimum number of stripes (semaphores) required
           * 
          @param permits the number of permits in each semaphore
           * 
          @return a new {@code Striped<Semaphore>}
             
          */
          public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits);
          /**
           * Creates a {
          @code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
           * read-write locks. Every lock is reentrant.
           *
           * 
          @param stripes the minimum number of stripes (locks) required
           * 
          @return a new {@code Striped<ReadWriteLock>}
           
          */
          public static Striped<ReadWriteLock> readWriteLock(int stripes);
          /**
           * Creates a {
          @code Striped<ReadWriteLock>} with lazily initialized, weakly referenced
           * read-write locks. Every lock is reentrant.
           *
           * 
          @param stripes the minimum number of stripes (locks) required
           * 
          @return a new {@code Striped<ReadWriteLock>}
           
          */
          public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes);

          Striped有兩個具體實現(xiàn)類,CompactStriped和LazyStriped,他們都繼承自PowerOfTwoStriped(用于表達(dá)內(nèi)部保存的stripes值是2的指數(shù)值)。PowerOfTwoStriped實現(xiàn)了indexFor()方法,它使用hash值做映射函數(shù):
            private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
              /** Capacity (power of two) minus one, for fast mod evaluation */
              final int mask;

              @Override final int indexFor(Object key) {
                int hash = smear(key.hashCode());
                return hash & mask;
              }
            }
            private static int smear(int hashCode) {
              hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
              return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
            }
          CompactStriped類使用一個數(shù)組保存所有的Lock/Semaphore/ReadWriteLock實例,在初始化時就建立所有的鎖實例;而LazyStriped類使用一個值為WeakReference的ConcurrentMap做為數(shù)據(jù)結(jié)構(gòu),index值為key,Lock/Semaphore/ReadWriteLock的WeakReference為值,所有鎖實例在用到時動態(tài)創(chuàng)建。在CompactStriped中創(chuàng)建鎖實例時對ReentrantLock/Semaphore創(chuàng)建采用PaddedXXX版本,不知道為何要做Pad。

          Striped類實現(xiàn)的類圖如下:
          posted on 2013-12-25 10:03 DLevin 閱讀(4232) 評論(3)  編輯  收藏 所屬分類: Guava

          FeedBack:
          # re: 深入Guava源碼之Stripe
          2013-12-26 18:08 | acha
          請問你的額UML是怎么做出來的?  回復(fù)  更多評論
            
          # re: 深入Guava源碼之Stripe
          2014-01-02 09:45 | DLevin
          用StarUML畫的~@acha
            回復(fù)  更多評論
            
          # re: 深入Guava源碼之Stripe
          2014-01-16 11:15 | 水水水水
          sss@acha
            回復(fù)  更多評論
            

          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 潞西市| 长宁县| 敖汉旗| 昌都县| 太仆寺旗| 彭水| 恩施市| 左贡县| 元谋县| 东乡| 新乐市| 巫山县| 恩施市| 阳信县| 祁阳县| 驻马店市| 朝阳市| 哈尔滨市| 大名县| 舞阳县| 襄垣县| 淳化县| 广昌县| 安平县| 绵竹市| 高雄市| 贡嘎县| 明星| 确山县| 甘德县| 丽水市| 云阳县| 西城区| 七台河市| 万载县| 黎平县| 东方市| 通江县| 毕节市| 陆河县| 汕头市|