java.util.concurrent提供了多種并發(fā)容器,總體上來說有4類,隊列類型的BlockingQueue和 ConcurrentLinkedQueue,Map類型的ConcurrentMap,Set類型的ConcurrentSkipListSet和CopyOnWriteArraySet,List類型的CopyOnWriteArrayList.
這些并發(fā)容器都采用了多種手段控制并發(fā)的存取操作,并且盡可能減小控制并發(fā)所帶來的性能損耗。接下來我們會對每一種類型的實現(xiàn)類進行代碼分析,進而得到java.util.con current包所提供的并發(fā)容器在傳統(tǒng)容器上所做的工作。
2. BlockingQueue
BlockingQueue接口定義的所有方法實現(xiàn)都是線程安全的,它的實現(xiàn)類里面都會用鎖和其他控制并發(fā)的手段保證這種線程安全,但是這些類同時也實現(xiàn)了Collection接口(主要是AbstractQueue實現(xiàn)),所以會出現(xiàn)BlockingQueue的實現(xiàn)類也能同時使用Conllection接口方法,而這時會出現(xiàn)的問題就是像addAll,containsAll,retainAll和removeAll這類批量方法的實現(xiàn)不保證線程安全,舉個例子就是addAll 10個items到一個ArrayBlockingQueue,可能中途失敗但是卻有幾個item已經被放進這個隊列里面了。
下面我們根據(jù)這幅類圖來逐個解析不同實現(xiàn)類的特性和特性實現(xiàn)代碼

DelayQueue提供了一個只返回超時元素的阻塞隊列,也就是說,即使隊列中已經有數(shù)據(jù)了,但是poll或者take的時候還要判定這個element有沒達到規(guī)定的超時時間,poll方法在element還沒達到規(guī)定的超時時間返回null,take則會通過condition.waitNanos()進入等待狀態(tài)。一般存儲的element類型為Delayed,這個接口JDK中實現(xiàn)的類有ScheduledFutureTask,而DelayQueue為DelayedWorkQueue的Task容器,后者是ScheduledThreadPoolExecutor的工作隊列,所以DelayQueue所具有的超時提供元素和線程安全特性對于并發(fā)的定時任務有很大的意義。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//控制并發(fā)
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
//condition協(xié)調隊列里面元素
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
//因為first在隊列里面的delay最短的(優(yōu)先隊列保證),所以wait這個時間那么隊列中最短delay的元素就超時了.即
//隊列有元素供應了.
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
DelayQueue的內部數(shù)據(jù)結構是PriorityQueue,因為Delayed接口同時繼承了Comparable接口,并且Delayed的實現(xiàn)類對于這個compareTo方法的實現(xiàn)是基于超時時間進行大小比較,所以DelayQueue無需關心數(shù)據(jù)的排序問題,只需要做好存取的并發(fā)控制(ReetranLock)和超時判定即可。另外,DelayQueue有一個實現(xiàn)細節(jié)就是通過一個Condition來協(xié)調隊列中是否有數(shù)據(jù)可以提供,這對于take和帶有提取超時時間的poll是有意義的(生產者,消費者的實現(xiàn))。final ReentrantLock lock = this.lock;
//控制并發(fā)
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
//condition協(xié)調隊列里面元素
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
//因為first在隊列里面的delay最短的(優(yōu)先隊列保證),所以wait這個時間那么隊列中最短delay的元素就超時了.即
//隊列有元素供應了.
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}
PriorityBlockingQueue實現(xiàn)對于外部而言是按照元素的某種順序返回元素,同時對存取提供并發(fā)保護(ReetranLock),使用Condition協(xié)調隊列是否有新元素提供。PriorityBlocking Queue內部的數(shù)據(jù)結構為PriorityQueue,優(yōu)先級排序工作交給PriorityQueue,至于怎么排序,需要根據(jù)插入元素的Comparable的接口實現(xiàn),和DelayQueue比起來,它沒有限定死插入數(shù)據(jù)的Comparable實現(xiàn),而DelayQueue的元素實現(xiàn)Comparable必須按照超時時間的長短進行比較,否則DelayQueue返回的元素就很可能是錯誤的。
ArrayBlockingQueue是一個先入先出的隊列,內部數(shù)據(jù)結構為一個數(shù)組,并且一旦創(chuàng)建這個隊列的長度是不可改變的,當然put數(shù)據(jù)時,這個隊列也不會自動增長。ArrayBlockingQueue也是使用ReetranLock來保證存取的原子性,不過使用了notEmpty和notFull兩個Condition來協(xié)調隊列為空和隊列為滿的狀態(tài)轉換,插入數(shù)據(jù)的時候,判定當前內部數(shù)據(jù)結構數(shù)組E[] items的長度是否等于元素計數(shù),如果相等,說明隊列滿,notFull.await(),直到items數(shù)組重新不為滿(removeAt,poll等),插入數(shù)據(jù)后notEmpty.sinal()通知所有取數(shù)據(jù)或者移除數(shù)據(jù)并且因為items為空而等待的線程可以繼續(xù)進行操作了。提取數(shù)據(jù)或者移除數(shù)據(jù)的過程剛好相反。
ArrayBlockingQueue使用三個數(shù)字來維護隊列里面的數(shù)據(jù)變更,包括takeIndex,putIndex,count,這里需要講一下takeIndex和putIndex,其中takeIndex指向下一個能夠被提取的元素,而putIndex指向下一個能夠插入數(shù)據(jù)的位置,實現(xiàn)類似下圖的結構,當takeIndex移到內部數(shù)組items最大長度時,重新賦值為0,也就是回到數(shù)組頭部,putIndex也是相同的策略.

















//put后,使用Condition通知正在等待take的線程可以做提取操作

















































//無論是offerLast,offerFirst,pollFirst,pollLast等等方法都會使用同一把鎖.
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst();
} finally {
lock.unlock();
}
}
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
} finally {
lock.unlock();
}
}
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst();
} finally {
lock.unlock();
}
}
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkLast();
} finally {
lock.unlock();
}
}
3. ConcurrentMap
ConcurrentMap定義了V putIfAbsent(K key,V value),Boolean remove(Object Key,Object value),Boolean replace(K key,V oldValue,V newValue)以及V replace(K key,V value)四個方法,幾個方法的特性并不難理解,4個方法都是線程安全的。
ConcurrentHashMap是ConcurrentMap的一個實現(xiàn)類,這個類的實現(xiàn)相當經典,基本思想就是分拆鎖,默認ConcurrentHashMap會實例化一個持有16個Segment對象的數(shù)組,Segment數(shù)組大小是可以設定的,構造函數(shù)里的concurrencyLevel指定這個值,但是需要注意的是,這個值并不是直接賦值.Segment數(shù)組最大長度為MAX_SEGMENTS = 1 << 16












下面3段代碼是ConcurrentHashMap的初始化Segment,計算hash值,以及如何選擇Segment的代碼以及示例注解.
















































*外部用戶傳進來劣質的hash值(比如重復度很高)所帶來
*的危害.











Segment的put方法在lock()后,首先對數(shù)組長度加了新的元素之后是否會超過閾值threshold進行了判定,如果超過,那么進行rehash(),rehash()的過程相對繁瑣,首先數(shù)組會自動增長一倍,然后需要對HashEntry數(shù)組中的所有元素都需要重新計算hash值,并且置到新數(shù)組的新的位置,同時為了減小操作損耗,將原來不需要移動的數(shù)據(jù)不做移動操作(power-of-two expansion,在默認threshold,在數(shù)組擴大一倍時只需要移動1/6元素,其他都可以不動)。所有動作完成之后,通過一個while循環(huán)尋找Segment中是否有相同Key存在,如果已經存在,那么根據(jù)onlyIfAbsent參數(shù)確定是否替換(如果為true,不替換,如果為false,替換掉value),然后返回替換的value,如果不存在,那么新生成一個HashEntry,并且根據(jù)一開始計算出來的index放到數(shù)組指定位置,并且累積元素計數(shù),返回put的值。最后unlock()釋放掉鎖.
4. CopyOnWriteArrayList和CopyOnWriteArraySet
CopyOnWriteList是線程安全的List實現(xiàn),其底層數(shù)據(jù)存儲結構為數(shù)組(Object[] array),它在讀操作遠遠多于寫操作的場景下表現(xiàn)良好,這其中的原因在于其讀操作(get(),indexOf(),isEmpty(),contains())不加任何鎖,而寫操作(set(),add(),remove())通過Arrays.copyOf()操作拷貝當前底層數(shù)據(jù)結構(array),在其上面做完增刪改等操作,再將新的數(shù)組置為底層數(shù)據(jù)結構,同時為了避免并發(fā)增刪改, CopyOnWriteList在這些寫操作上通過一個ReetranLock進行并發(fā)控制。另外需要注意的是,CopyOnWriteList所實現(xiàn)的迭代器其數(shù)據(jù)也是底層數(shù)組鏡像,所以在CopyOnWriteList進行interator,同時并發(fā)增刪改CopyOnWriteList里的數(shù)據(jù)實不會拋“ConcurrentModificationException”,當然在迭代器上做remove,add,set也是無效的(拋UnsupportedOperationExcetion),因為迭代器上的數(shù)據(jù)只是當前List的數(shù)據(jù)數(shù)組的一個拷貝而已。
CopyOnWriteSet是一個線程安全的Set實現(xiàn),然后持有一個CopyOnWriteList實例,其所有的操作都是這個CopyOnWriteList實例來實現(xiàn)的。CopyOnWriteSet與CopyOnWriteList的區(qū)別實際上就是Set與List的區(qū)別,前者不允許有重復的元素,后者是可以的,所以CopyOnWriteSet的add和addAll兩個操作使用的是其內部CopyOnWriteList實例的addAbsent()和addAllAbsent()兩個防止重復元素的方法,addAbsent()實現(xiàn)是拷貝底層數(shù)據(jù)數(shù)組,然后逐一比較是否相同,如果有一個相同,那么直接返回false,說明插入失敗,如果和其他元素不同,那么將元素加入到新的數(shù)組中,最后置回新的數(shù)組, addAllAbsent()方法實現(xiàn)則是能有多少數(shù)據(jù)插入就插入,也就是說addAllAbsent一個集合的數(shù)據(jù),可能只有一部分插入成功,另外一部分因為元素相同而遭丟棄,完成后返回插入的元素。