Java并發基礎實踐--分而治之
本系列的第三篇文章將以實現一個極簡單的查找最大數的任務為例,分別給出了四個版本:1.順序執行;2.基于傳統的Thread.join();3.基于并發工具包的Future;4.基于JDK 7引入的Fork/Join框架。(2013.10.25最后更新)分而治之(Divide-and-Conquer)是解決復雜問題的常用方法。在并發應用中,可將一個復雜算法分解為若干子問題,然后使用獨立的線程去執行解決子問題的程序,之后再將各個子問題的結果逐級進行合并以得到最終結果。在特定環境中,這種方式可能較好地提高程序的執行效率。
方案1:順序執行
找出對給定整形數組中的最大值,使用的方法很簡單,就是逐一遍歷每個元素,將當前元素與當前最大值進行比較,若當前元素的值更大,則將該值作為新的當前最大值,再去與下一個元素進行比較,如此反復。在編寫并發程序來實現這個算法之前,本文將先給出一個順序執行的實現版本,但依然利用了分而治之的思想。即,先將給定數列分割成若干較小的子數列,找出各個子數列的最大值,將這些最大值組成一個新的數列,然后再使用同樣的方法對這個新數列進行分割與最大值合并,...依次類推,直至找到最大值。
代碼清單1中的MaxNumberFinder是本文的基礎類:1.getMaxNumber()方法展示了如何查找一個數列中的最大值;2.使用工具方法getNumberArray()/createNumberArray()可以創建指定長度的隨機整數數列;3.方法findMaxNumber()展示了將一個數列分割為子數列(子數列的最大長度不超過指定值THRESHOLD),并查找子數列的最大值,以及將子數列的最大值組成各級新的中間數列去查找其最大值。
清單1
public class MaxNumberFinder {
public static final int THRESHOLD = 50;
private static final Random random = new Random();
public static int[] getNumberArray() {
return createNumberArray(3000);
}
private static int[] createNumberArray(int capacity) {
int[] numbers = new int[capacity];
for (int i = 0; i < numbers.length; i++) {
numbers[i] = random.nextInt(capacity);
}
return numbers;
}
public static int getMaxNumber(int[] numbers) {
if (numbers.length == 0) {
return Integer.MIN_VALUE;
}
int max = numbers[0];
for (int i = 1; i < numbers.length; i++) {
if (numbers[i] > max) {
max = numbers[i];
}
}
return max;
}
private static int findMaxNumber(int[] numbers) {
// interim max number array
int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
maxNumbers[i / THRESHOLD] = getMaxNumber(subNumbers);
}
if (numbers.length % THRESHOLD != 0) {
int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
maxNumbers[maxNumbers.length - 1] = getMaxNumber(lastSubNumbers);
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return getMaxNumber(maxNumbers);
}
}
}
public class MaxNumberFinder {
public static final int THRESHOLD = 50;
private static final Random random = new Random();
public static int[] getNumberArray() {
return createNumberArray(3000);
}
private static int[] createNumberArray(int capacity) {
int[] numbers = new int[capacity];
for (int i = 0; i < numbers.length; i++) {
numbers[i] = random.nextInt(capacity);
}
return numbers;
}
public static int getMaxNumber(int[] numbers) {
if (numbers.length == 0) {
return Integer.MIN_VALUE;
}
int max = numbers[0];
for (int i = 1; i < numbers.length; i++) {
if (numbers[i] > max) {
max = numbers[i];
}
}
return max;
}
private static int findMaxNumber(int[] numbers) {
// interim max number array
int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
maxNumbers[i / THRESHOLD] = getMaxNumber(subNumbers);
}
if (numbers.length % THRESHOLD != 0) {
int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
maxNumbers[maxNumbers.length - 1] = getMaxNumber(lastSubNumbers);
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return getMaxNumber(maxNumbers);
}
}
}
方案2:基于Thread.join()
基于方法MaxNumberFinder.findMaxNumber()的實現,在每次分割之后得到的數列,以及合并得到的中間最大值的數列,都可以使用獨立的線程去分別查找它們的最大值,如代碼清單2所示。
清單2
public class MaxNumberFinderOnThread {
private static int findMaxNumber(int[] numbers) {
// interim max number array
final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
// the threads for searching max value in sub number array
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
final int bufIndex = i / THRESHOLD;
Thread bufThread = new Thread(new Runnable() {
@Override
public void run() {
maxNumbers[bufIndex] = MaxNumberFinder.getMaxNumber(subNumbers);
}
});
bufThread.start();
threads.add(bufThread);
}
if (numbers.length % THRESHOLD != 0) {
final int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
final int lastIndex = (numbers.length - 1) / THRESHOLD;
Thread lastThread = new Thread(new Runnable() {
@Override
public void run() {
maxNumbers[lastIndex] = MaxNumberFinder.getMaxNumber(lastSubNumbers);
}
});
threads.add(lastThread);
lastThread.start();
}
// waiting for all of jobs are finished
for (int i = 0, size = threads.size(); i < size; i++) {
try {
threads.get(i).join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return MaxNumberFinder.getMaxNumber(maxNumbers);
}
}
}
為了能夠將同一級線程所找的最大值合并成一個新的中間最大值數列,必須要等待這一組線程全部執行完畢。而通過分別調用每個線程實例中join()方法即可滿足這一要求。public class MaxNumberFinderOnThread {
private static int findMaxNumber(int[] numbers) {
// interim max number array
final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
// the threads for searching max value in sub number array
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
final int bufIndex = i / THRESHOLD;
Thread bufThread = new Thread(new Runnable() {
@Override
public void run() {
maxNumbers[bufIndex] = MaxNumberFinder.getMaxNumber(subNumbers);
}
});
bufThread.start();
threads.add(bufThread);
}
if (numbers.length % THRESHOLD != 0) {
final int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
final int lastIndex = (numbers.length - 1) / THRESHOLD;
Thread lastThread = new Thread(new Runnable() {
@Override
public void run() {
maxNumbers[lastIndex] = MaxNumberFinder.getMaxNumber(lastSubNumbers);
}
});
threads.add(lastThread);
lastThread.start();
}
// waiting for all of jobs are finished
for (int i = 0, size = threads.size(); i < size; i++) {
try {
threads.get(i).join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return MaxNumberFinder.getMaxNumber(maxNumbers);
}
}
}
必須注意的是,若數列的很長,而每次最多處理的數列較短(即,THRESHOLD值較小),該方案將會產生較多的線程,消耗大量內存。另外,還會有較多的高層線程在等待低層線程的執行結果,這可能會大大影響整個任務的執行效率。
方案3:基于Future
方案2使用的是舊有API,根據本系列上一篇中所提及的并發工具包中的Future,同樣可以實現這一功能。只需要將使用Thread/Runnable的地方,相應地替換成使用Future/Callable即可,如代碼清單3所示。
清單3
public class MaxNumberFinderOnFuture {
private static ExecutorService executor = Executors.newCachedThreadPool();
private static int findMaxNumber(int[] numbers) {
final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
Future<Integer> bufFuture = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return MaxNumberFinder.getMaxNumber(subNumbers);
}
});
futures.add(bufFuture);
}
if (numbers.length % THRESHOLD != 0) {
final int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
Future<Integer> lastFuture = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return MaxNumberFinder.getMaxNumber(lastSubNumbers);
}
});
futures.add(lastFuture);
}
// retrieve results from Futures one by one,
// get() method will be blocked if the searching isn't finished
for (int i = 0, size = futures.size(); i < size; i++) {
try {
maxNumbers[i] = futures.get(i).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return MaxNumberFinder.getMaxNumber(maxNumbers);
}
}
}
這一版的findMaxNumber()實現中,沒有顯示地調用Thread.join()方法去等待線程的執行結果。但在任務執行完成之前,調用Future.get()時會被阻塞,在此處的效果就與Thread.join()相同。public class MaxNumberFinderOnFuture {
private static ExecutorService executor = Executors.newCachedThreadPool();
private static int findMaxNumber(int[] numbers) {
final int[] maxNumbers = new int[numbers.length / THRESHOLD + (numbers.length % THRESHOLD == 0 ? 0 : 1)];
List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
for (int i = 0; i <= numbers.length - THRESHOLD; i += THRESHOLD) {
final int[] subNumbers = new int[THRESHOLD];
System.arraycopy(numbers, i, subNumbers, 0, subNumbers.length);
Future<Integer> bufFuture = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return MaxNumberFinder.getMaxNumber(subNumbers);
}
});
futures.add(bufFuture);
}
if (numbers.length % THRESHOLD != 0) {
final int[] lastSubNumbers = new int[numbers.length % THRESHOLD];
System.arraycopy(numbers, numbers.length - lastSubNumbers.length, lastSubNumbers, 0, lastSubNumbers.length);
Future<Integer> lastFuture = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return MaxNumberFinder.getMaxNumber(lastSubNumbers);
}
});
futures.add(lastFuture);
}
// retrieve results from Futures one by one,
// get() method will be blocked if the searching isn't finished
for (int i = 0, size = futures.size(); i < size; i++) {
try {
maxNumbers[i] = futures.get(i).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// if the length of interim max number array is greater than threshold,
// it must divide-and-search recursively.
if (maxNumbers.length > THRESHOLD) {
return findMaxNumber(maxNumbers);
} else {
return MaxNumberFinder.getMaxNumber(maxNumbers);
}
}
}
方案4:基于Fork/Join
方案2與方案3不一定能夠提高執行效率。如果該應用程序運行在單核處理器上,或者它沒有利用上多核處理器中的多個內核,那么它的執行時間很有可能要長于方案1所使用的順序執行方案,因為線程的創建、調度、上下文切換都會產生額外的開銷。
為了更好地適應已經十分普遍的多核處理器場景,JDK 7引入了Fork/Join框架。如代碼清單4所示,基于該框架提供的RecursiveTask,我們就可以直接地對任務進行分割與合并,程序本身也更為清晰簡潔。
清單4
public class NumberFinderOnForkJoin extends RecursiveTask<Integer> {
private static final long serialVersionUID = -5871813408961649666L;
private int[] numbers = null;
private int maxNumber = Integer.MIN_VALUE;
public NumberFinderOnForkJoin(int[] numbers) {
this.numbers = numbers;
}
@Override
public Integer compute() {
if (numbers.length <= THRESHOLD) {
maxNumber = NumberFinder.getMaxNumber(numbers);
} else {
int[] leftNumbers = new int[numbers.length / 2];
System.arraycopy(numbers, 0, leftNumbers, 0, leftNumbers.length);
int[] rightNumbers = new int[numbers.length - numbers.length / 2];
System.arraycopy(numbers, leftNumbers.length, rightNumbers, 0, rightNumbers.length);
// divide the task into two sub-tasks.
NumberFinderOnForkJoin leftTask = new NumberFinderOnForkJoin(leftNumbers);
NumberFinderOnForkJoin rightTask = new NumberFinderOnForkJoin(rightNumbers);
invokeAll(leftTask, rightTask);
maxNumber = Math.max(leftTask.maxNumber, rightTask.maxNumber);
}
return maxNumber;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
NumberFinderOnForkJoin task = new NumberFinderOnForkJoin(NumberFinder.getNumberArray());
ForkJoinPool pool = new ForkJoinPool();
pool.submit(task);
System.out.println(task.get());
pool.shutdown();
}
}
public class NumberFinderOnForkJoin extends RecursiveTask<Integer> {
private static final long serialVersionUID = -5871813408961649666L;
private int[] numbers = null;
private int maxNumber = Integer.MIN_VALUE;
public NumberFinderOnForkJoin(int[] numbers) {
this.numbers = numbers;
}
@Override
public Integer compute() {
if (numbers.length <= THRESHOLD) {
maxNumber = NumberFinder.getMaxNumber(numbers);
} else {
int[] leftNumbers = new int[numbers.length / 2];
System.arraycopy(numbers, 0, leftNumbers, 0, leftNumbers.length);
int[] rightNumbers = new int[numbers.length - numbers.length / 2];
System.arraycopy(numbers, leftNumbers.length, rightNumbers, 0, rightNumbers.length);
// divide the task into two sub-tasks.
NumberFinderOnForkJoin leftTask = new NumberFinderOnForkJoin(leftNumbers);
NumberFinderOnForkJoin rightTask = new NumberFinderOnForkJoin(rightNumbers);
invokeAll(leftTask, rightTask);
maxNumber = Math.max(leftTask.maxNumber, rightTask.maxNumber);
}
return maxNumber;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
NumberFinderOnForkJoin task = new NumberFinderOnForkJoin(NumberFinder.getNumberArray());
ForkJoinPool pool = new ForkJoinPool();
pool.submit(task);
System.out.println(task.get());
pool.shutdown();
}
}
稍稍說明一下,方案4對數列進行分割的方法與前三個方案都不同。方案4是使用的二分法,而前三個方案都是按從前往后的順序依次取出不長于THRESHOLD的子數列。但這兩種方法沒有本質上的區別,最多只是分割的次數略有不同罷了。例如,一個長度為9的數列,將THRESHOLD的值設為3。若用二分法,第一輪會分割出4個子數列,其長度分別為3,2,3和1,后面還需要進行一輪分割;但若用順序法,分割將得到的3個子數列,其長度均為3,之后不需要再進行分割了。
小結
分解任務,各個擊破,是應對復雜問題的慣用伎倆。在資源充足的情況下,應該盡可能地利用空閑的計算資源。Java并發工具包提供了適應多核環境的運行框架,使應用程序能更高效地利用多核處理器。
但對執行方案的選定,包括THRESHOLD的值,依然要基于性能測試。對于本文的例子,在我的測試環境中,方案1其實是最高的。在并發執行方案中,方案2會明顯慢于方案3和方案4,而方案3與方案4之間則難分伯仲。
posted on 2013-10-23 23:27 John Jiang 閱讀(4264) 評論(0) 編輯 收藏 所屬分類: JavaSE 、Java 、Concurrency 、原創 、Java并發基礎實踐