java.util.concurrent提供了多種并發容器,總體上來說有4類,隊列類型的BlockingQueue和 ConcurrentLinkedQueue,Map類型的ConcurrentMap,Set類型的ConcurrentSkipListSet和CopyOnWriteArraySet,List類型的CopyOnWriteArrayList.
這些并發容器都采用了多種手段控制并發的存取操作,并且盡可能減小控制并發所帶來的性能損耗。接下來我們會對每一種類型的實現類進行代碼分析,進而得到java.util.con current包所提供的并發容器在傳統容器上所做的工作。
2. BlockingQueue
BlockingQueue接口定義的所有方法實現都是線程安全的,它的實現類里面都會用鎖和其他控制并發的手段保證這種線程安全,但是這些類同時也實現了Collection接口(主要是AbstractQueue實現),所以會出現BlockingQueue的實現類也能同時使用Conllection接口方法,而這時會出現的問題就是像addAll,containsAll,retainAll和removeAll這類批量方法的實現不保證線程安全,舉個例子就是addAll 10個items到一個ArrayBlockingQueue,可能中途失敗但是卻有幾個item已經被放進這個隊列里面了。
下面我們根據這幅類圖來逐個解析不同實現類的特性和特性實現代碼

DelayQueue提供了一個只返回超時元素的阻塞隊列,也就是說,即使隊列中已經有數據了,但是poll或者take的時候還要判定這個element有沒達到規定的超時時間,poll方法在element還沒達到規定的超時時間返回null,take則會通過condition.waitNanos()進入等待狀態。一般存儲的element類型為Delayed,這個接口JDK中實現的類有ScheduledFutureTask,而DelayQueue為DelayedWorkQueue的Task容器,后者是ScheduledThreadPoolExecutor的工作隊列,所以DelayQueue所具有的超時提供元素和線程安全特性對于并發的定時任務有很大的意義。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//控制并發
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
//condition協調隊列里面元素
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
//因為first在隊列里面的delay最短的(優先隊列保證),所以wait這個時間那么隊列中最短delay的元素就超時了.即
//隊列有元素供應了.
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
DelayQueue的內部數據結構是PriorityQueue,因為Delayed接口同時繼承了Comparable接口,并且Delayed的實現類對于這個compareTo方法的實現是基于超時時間進行大小比較,所以DelayQueue無需關心數據的排序問題,只需要做好存取的并發控制(ReetranLock)和超時判定即可。另外,DelayQueue有一個實現細節就是通過一個Condition來協調隊列中是否有數據可以提供,這對于take和帶有提取超時時間的poll是有意義的(生產者,消費者的實現)。final ReentrantLock lock = this.lock;
//控制并發
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
//condition協調隊列里面元素
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
//因為first在隊列里面的delay最短的(優先隊列保證),所以wait這個時間那么隊列中最短delay的元素就超時了.即
//隊列有元素供應了.
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
PriorityBlockingQueue實現對于外部而言是按照元素的某種順序返回元素,同時對存取提供并發保護(ReetranLock),使用Condition協調隊列是否有新元素提供。PriorityBlocking Queue內部的數據結構為PriorityQueue,優先級排序工作交給PriorityQueue,至于怎么排序,需要根據插入元素的Comparable的接口實現,和DelayQueue比起來,它沒有限定死插入數據的Comparable實現,而DelayQueue的元素實現Comparable必須按照超時時間的長短進行比較,否則DelayQueue返回的元素就很可能是錯誤的。
ArrayBlockingQueue是一個先入先出的隊列,內部數據結構為一個數組,并且一旦創建這個隊列的長度是不可改變的,當然put數據時,這個隊列也不會自動增長。ArrayBlockingQueue也是使用ReetranLock來保證存取的原子性,不過使用了notEmpty和notFull兩個Condition來協調隊列為空和隊列為滿的狀態轉換,插入數據的時候,判定當前內部數據結構數組E[] items的長度是否等于元素計數,如果相等,說明隊列滿,notFull.await(),直到items數組重新不為滿(removeAt,poll等),插入數據后notEmpty.sinal()通知所有取數據或者移除數據并且因為items為空而等待的線程可以繼續進行操作了。提取數據或者移除數據的過程剛好相反。
ArrayBlockingQueue使用三個數字來維護隊列里面的數據變更,包括takeIndex,putIndex,count,這里需要講一下takeIndex和putIndex,其中takeIndex指向下一個能夠被提取的元素,而putIndex指向下一個能夠插入數據的位置,實現類似下圖的結構,當takeIndex移到內部數組items最大長度時,重新賦值為0,也就是回到數組頭部,putIndex也是相同的策略.

















//put后,使用Condition通知正在等待take的線程可以做提取操作

















































//無論是offerLast,offerFirst,pollFirst,pollLast等等方法都會使用同一把鎖.
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst();
} finally {
lock.unlock();
}
}
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
} finally {
lock.unlock();
}
}
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst();
} finally {
lock.unlock();
}
}
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
} finally {
lock.unlock();
}
}
3. ConcurrentMap
ConcurrentMap定義了V putIfAbsent(K key,V value),Boolean remove(Object Key,Object value),Boolean replace(K key,V oldValue,V newValue)以及V replace(K key,V value)四個方法,幾個方法的特性并不難理解,4個方法都是線程安全的。
ConcurrentHashMap是ConcurrentMap的一個實現類,這個類的實現相當經典,基本思想就是分拆鎖,默認ConcurrentHashMap會實例化一個持有16個Segment對象的數組,Segment數組大小是可以設定的,構造函數里的concurrencyLevel指定這個值,但是需要注意的是,這個值并不是直接賦值.Segment數組最大長度為MAX_SEGMENTS = 1 << 16












下面3段代碼是ConcurrentHashMap的初始化Segment,計算hash值,以及如何選擇Segment的代碼以及示例注解.
















































*外部用戶傳進來劣質的hash值(比如重復度很高)所帶來
*的危害.











Segment的put方法在lock()后,首先對數組長度加了新的元素之后是否會超過閾值threshold進行了判定,如果超過,那么進行rehash(),rehash()的過程相對繁瑣,首先數組會自動增長一倍,然后需要對HashEntry數組中的所有元素都需要重新計算hash值,并且置到新數組的新的位置,同時為了減小操作損耗,將原來不需要移動的數據不做移動操作(power-of-two expansion,在默認threshold,在數組擴大一倍時只需要移動1/6元素,其他都可以不動)。所有動作完成之后,通過一個while循環尋找Segment中是否有相同Key存在,如果已經存在,那么根據onlyIfAbsent參數確定是否替換(如果為true,不替換,如果為false,替換掉value),然后返回替換的value,如果不存在,那么新生成一個HashEntry,并且根據一開始計算出來的index放到數組指定位置,并且累積元素計數,返回put的值。最后unlock()釋放掉鎖.
4. CopyOnWriteArrayList和CopyOnWriteArraySet
CopyOnWriteList是線程安全的List實現,其底層數據存儲結構為數組(Object[] array),它在讀操作遠遠多于寫操作的場景下表現良好,這其中的原因在于其讀操作(get(),indexOf(),isEmpty(),contains())不加任何鎖,而寫操作(set(),add(),remove())通過Arrays.copyOf()操作拷貝當前底層數據結構(array),在其上面做完增刪改等操作,再將新的數組置為底層數據結構,同時為了避免并發增刪改, CopyOnWriteList在這些寫操作上通過一個ReetranLock進行并發控制。另外需要注意的是,CopyOnWriteList所實現的迭代器其數據也是底層數組鏡像,所以在CopyOnWriteList進行interator,同時并發增刪改CopyOnWriteList里的數據實不會拋“ConcurrentModificationException”,當然在迭代器上做remove,add,set也是無效的(拋UnsupportedOperationExcetion),因為迭代器上的數據只是當前List的數據數組的一個拷貝而已。
CopyOnWriteSet是一個線程安全的Set實現,然后持有一個CopyOnWriteList實例,其所有的操作都是這個CopyOnWriteList實例來實現的。CopyOnWriteSet與CopyOnWriteList的區別實際上就是Set與List的區別,前者不允許有重復的元素,后者是可以的,所以CopyOnWriteSet的add和addAll兩個操作使用的是其內部CopyOnWriteList實例的addAbsent()和addAllAbsent()兩個防止重復元素的方法,addAbsent()實現是拷貝底層數據數組,然后逐一比較是否相同,如果有一個相同,那么直接返回false,說明插入失敗,如果和其他元素不同,那么將元素加入到新的數組中,最后置回新的數組, addAllAbsent()方法實現則是能有多少數據插入就插入,也就是說addAllAbsent一個集合的數據,可能只有一部分插入成功,另外一部分因為元素相同而遭丟棄,完成后返回插入的元素。