Java并發(fā)基礎(chǔ)實踐--分而治之
本系列的第三篇文章將以實現(xiàn)一個極簡單的查找最大數(shù)的任務(wù)為例,分別給出了四個版本:1.順序執(zhí)行;2.基于傳統(tǒng)的Thread.join();3.基于并發(fā)工具包的Future;4.基于JDK 7引入的Fork/Join框架。(2013.10.25最后更新) 分而治之(Divide-and-Conquer)是解決復(fù)雜問題的常用方法。在并發(fā)應(yīng)用中,可將一個復(fù)雜算法分解為若干子問題,然后使用獨(dú)立的線程去執(zhí)行解決子問題的程序,之后再將各個子問題的結(jié)果逐級進(jìn)行合并以得到最終結(jié)果。在特定環(huán)境中,這種方式可能較好地提高程序的執(zhí)行效率。方案1:順序執(zhí)行 找出對給定整形數(shù)組中的最大值,使用的方法很簡單,就是逐一遍歷每個元素,將當(dāng)前元素與當(dāng)前最大值進(jìn)行比較,若當(dāng)前元素的值更大,則將該值作為新的當(dāng)前最大值,再去與下一個元素進(jìn)行比較,如此反復(fù)。在編寫并發(fā)程序來實現(xiàn)這個算法之前,本文將先給出一個順序執(zhí)行的實現(xiàn)版本,但依然利用了分而治之的思想。即,先將給定數(shù)列分割成若干較小的子數(shù)列,找出各個子數(shù)列的最大值,將這些最大值組成一個新的數(shù)列,然后再使用同樣的方法對這個新數(shù)列進(jìn)行分割與最大值合并,...依次類推,直至找到最大值。 代碼清單1中的MaxNumberFinder是本文的基礎(chǔ)類:1.getMaxNumber()方法展示了如何查找一個數(shù)列中的最大值;2.使用工具方法getNumberArray()/createNumberArray()可以創(chuàng)建指定長度的隨機(jī)整數(shù)數(shù)列;3.方法findMaxNumber()展示了將一個數(shù)列分割為子數(shù)列(子數(shù)列的最大長度不超過指定值THRESHOLD),并查找子數(shù)列的最大值,以及將子數(shù)列的最大值組成各級新的中間數(shù)列去查找其最大值。清單1
public class MaxNumberFinder {
public static final int THRESHOLD = 50;
private static final Random random = new Random();
public static int[] getNumberArray() {
return createNumberArray(3000);
}
private static int[] createNumberArray(int capacity) {
int[] numbers = new int[capacity];
for (int i = 0; i < numbers.length; i++) {
numbers[i] = random.nextInt(capacity);
}
return numbers;
}
public static int getMaxNumber(int[] numbers) {
if (numbers.length == 0) {
return Integer.MIN_VALUE;
}
int max = numbers[0];
for (int i = 1; i < numbers.length; i++) {
if (numbers[i] > max) {
max = numbers[i];
}
}
return max;
}
private static int findMaxNumber(int[] numbers) {
// interim max number array
int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
maxNumbers[i / THRESHOLD] = getMaxNumber(subNumbers);
}
if (numbers.length % THRESHOLD != 0) {
int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
maxNumbers[maxNumbers.length - 1] = getMaxNumber(lastSubNumbers);
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return getMaxNumber(maxNumbers);
}
}
}
方案2:基于Thread.join() 基于方法MaxNumberFinder.findMaxNumber()的實現(xiàn),在每次分割之后得到的數(shù)列,以及合并得到的中間最大值的數(shù)列,都可以使用獨(dú)立的線程去分別查找它們的最大值,如代碼清單2所示。清單2
public class MaxNumberFinderOnThread {
private static int findMaxNumber(int[] numbers) {
// interim max number array
final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
// the threads for searching max value in sub number array
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
final int bufIndex = i / THRESHOLD;
Thread bufThread = new Thread(new Runnable() {
@Override
public void run() {
maxNumbers[bufIndex] = MaxNumberFinder.getMaxNumber(subNumbers);
}
});
bufThread.start();
threads.add(bufThread);
}
if (numbers.length % THRESHOLD != 0) {
final int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
final int lastIndex = (numbers.length - 1) / THRESHOLD;
Thread lastThread = new Thread(new Runnable() {
@Override
public void run() {
maxNumbers[lastIndex] = MaxNumberFinder.getMaxNumber(lastSubNumbers);
}
});
threads.add(lastThread);
lastThread.start();
}
// waiting for all of jobs are finished
for (int i = 0, size = threads.size(); i < size; i++) {
try {
threads.get(i).join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return MaxNumberFinder.getMaxNumber(maxNumbers);
}
}
}
為了能夠?qū)⑼患壘€程所找的最大值合并成一個新的中間最大值數(shù)列,必須要等待這一組線程全部執(zhí)行完畢。而通過分別調(diào)用每個線程實例中join()方法即可滿足這一要求。 必須注意的是,若數(shù)列的很長,而每次最多處理的數(shù)列較短(即,THRESHOLD值較小),該方案將會產(chǎn)生較多的線程,消耗大量內(nèi)存。另外,還會有較多的高層線程在等待低層線程的執(zhí)行結(jié)果,這可能會大大影響整個任務(wù)的執(zhí)行效率。方案3:基于Future 方案2使用的是舊有API,根據(jù)本系列上一篇中所提及的并發(fā)工具包中的Future,同樣可以實現(xiàn)這一功能。只需要將使用Thread/Runnable的地方,相應(yīng)地替換成使用Future/Callable即可,如代碼清單3所示。清單3
public class MaxNumberFinderOnFuture {
private static ExecutorService executor = Executors.newCachedThreadPool();
private static int findMaxNumber(int[] numbers) {
final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
Future<Integer> bufFuture = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return MaxNumberFinder.getMaxNumber(subNumbers);
}
});
futures.add(bufFuture);
}
if (numbers.length % THRESHOLD != 0) {
final int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
Future<Integer> lastFuture = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return MaxNumberFinder.getMaxNumber(lastSubNumbers);
}
});
futures.add(lastFuture);
}
// retrieve results from Futures one by one,
// get() method will be blocked if the searching isn't finished
for (int i = 0, size = futures.size(); i < size; i++) {
try {
maxNumbers[i] = futures.get(i).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return MaxNumberFinder.getMaxNumber(maxNumbers);
}
}
}
這一版的findMaxNumber()實現(xiàn)中,沒有顯示地調(diào)用Thread.join()方法去等待線程的執(zhí)行結(jié)果。但在任務(wù)執(zhí)行完成之前,調(diào)用Future.get()時會被阻塞,在此處的效果就與Thread.join()相同。方案4:基于Fork/Join 方案2與方案3不一定能夠提高執(zhí)行效率。如果該應(yīng)用程序運(yùn)行在單核處理器上,或者它沒有利用上多核處理器中的多個內(nèi)核,那么它的執(zhí)行時間很有可能要長于方案1所使用的順序執(zhí)行方案,因為線程的創(chuàng)建、調(diào)度、上下文切換都會產(chǎn)生額外的開銷。 為了更好地適應(yīng)已經(jīng)十分普遍的多核處理器場景,JDK 7引入了Fork/Join框架。如代碼清單4所示,基于該框架提供的RecursiveTask,我們就可以直接地對任務(wù)進(jìn)行分割與合并,程序本身也更為清晰簡潔。清單4
public class NumberFinderOnForkJoin extends RecursiveTask<Integer> {
private static final long serialVersionUID = -5871813408961649666L;
private int[] numbers = null;
private int maxNumber = Integer.MIN_VALUE;
public NumberFinderOnForkJoin(int[] numbers) {
this.numbers = numbers;
}
@Override
public Integer compute() {
if (numbers.length <= THRESHOLD) {
maxNumber = NumberFinder.getMaxNumber(numbers);
} else {
int[] leftNumbers = new int[numbers.length / 2];
System.arraycopy(numbers, 0, leftNumbers, 0, leftNumbers.length);
int[] rightNumbers = new int[numbers.length - numbers.length / 2];
System.arraycopy(numbers, leftNumbers.length, rightNumbers, 0, rightNumbers.length);
// divide the task into two sub-tasks.
NumberFinderOnForkJoin leftTask = new NumberFinderOnForkJoin(leftNumbers);
NumberFinderOnForkJoin rightTask = new NumberFinderOnForkJoin(rightNumbers);
invokeAll(leftTask, rightTask);
maxNumber = Math.max(leftTask.maxNumber, rightTask.maxNumber);
}
return maxNumber;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
NumberFinderOnForkJoin task = new NumberFinderOnForkJoin(NumberFinder.getNumberArray());
ForkJoinPool pool = new ForkJoinPool();
pool.submit(task);
System.out.println(task.get());
pool.shutdown();
}
}
稍稍說明一下,方案4對數(shù)列進(jìn)行分割的方法與前三個方案都不同。方案4是使用的二分法,而前三個方案都是按從前往后的順序依次取出不長于THRESHOLD的子數(shù)列。但這兩種方法沒有本質(zhì)上的區(qū)別,最多只是分割的次數(shù)略有不同罷了。例如,一個長度為9的數(shù)列,將THRESHOLD的值設(shè)為3。若用二分法,第一輪會分割出4個子數(shù)列,其長度分別為3,2,3和1,后面還需要進(jìn)行一輪分割;但若用順序法,分割將得到的3個子數(shù)列,其長度均為3,之后不需要再進(jìn)行分割了。小結(jié) 分解任務(wù),各個擊破,是應(yīng)對復(fù)雜問題的慣用伎倆。在資源充足的情況下,應(yīng)該盡可能地利用空閑的計算資源。Java并發(fā)工具包提供了適應(yīng)多核環(huán)境的運(yùn)行框架,使應(yīng)用程序能更高效地利用多核處理器。 但對執(zhí)行方案的選定,包括THRESHOLD的值,依然要基于性能測試。對于本文的例子,在我的測試環(huán)境中,方案1其實是最高的。在并發(fā)執(zhí)行方案中,方案2會明顯慢于方案3和方案4,而方案3與方案4之間則難分伯仲。