利用Java SE 8流處理數據
-- 使用Java流操作去表達復雜的數據查詢
-- 使用Java流操作去表達復雜的數據查詢
本文是Java Magazine 201403/04刊中的一篇文章,也是文章系列"利用Java SE 8流處理數據"中的第一篇,它概述了Java流的基本原理與基本應用,是一篇很好的Java Streams API的入門文章。(2014.07.27最后更新)
沒有集合對象,你會怎么樣?幾乎每個Java應用都會創建并處理集合。它們是許多編程任務的基礎:集合使你能夠對數據進行分組和處理。例如,你也許會創建一個關于銀行交易的集合,該集合代表了某個用戶的銀行流水清單。然后,你可能想要處理整個集合去算出該用戶花了多少錢。盡管數據處理十分重要,但Java在此方面表現的遠不完美。
首先,典型的集合處理模式與類SQL操作相似,諸如"查找"(如找出最大金額的那筆交易)或者"分組"(如,將與購買雜貨相關的交易進行分組)。大部分數據庫允許你以聲明的形式去指定這些操作。例如,后面的SQL查詢會讓你找到那筆最大金額交易的ID:"SELECT id, MAX(value) from transctions"。
如你所見,我們并不需要去實現如何計算最大值(例如,使用循環和一個變量去追蹤這個最大值)。我們僅需要表達什么是我們想要的。這種基本思想就意味著,你不太需要擔心去顯式地實現這些查詢--它們已經為你處理好了。為什么我們不能在處理集合時也這樣做呢?你發現自己有多少次都是在一遍又一遍地使用循環去重復實現這些操作呢?
其次,我們如何才能更高效地去處理大型集合?理想情況下,在加速進行處理時,你會想到利用多核架構。然而,編寫并行程序既困難又容易出錯。
Java SE 8趕來幫忙了!Java API的設計者們在升級API時引入了一種新的稱之為Java流(流)的抽象,它允許你以聲明形式去處理數據。另外,Java流可以利用到多核架構而不必編寫一行多線程代碼。聽起來不錯,不是嗎?這就是本文章系列所要探究的主題。
Java流能為我們做些什么呢?在探究這些細節之前,讓我們先看一個例子,這樣你才能對這種新的使用Java SE 8 Java流的編程風格有感覺。假設我們要找到所有類型為grocery的交易并返回它們的ID列表,并按交易金額的遞減順序對該列表進行排序。在Java SE 7中,我們應該會把清單1所示的程序那樣去做。而在Java SE 8中,我們則會像清單2所示的那樣去實現。
清單1
List<Transaction> groceryTransactions = new Arraylist<>();
for(Transaction t: transactions){
if(t.getType() == Transaction.GROCERY){
groceryTransactions.add(t);
}
}
Collections.sort(groceryTransactions, new Comparator(){
public int compare(Transaction t1, Transaction t2){
return t2.getValue().compareTo(t1.getValue());
}
});
List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: groceryTransactions){
transactionsIds.add(t.getId());
}
for(Transaction t: transactions){
if(t.getType() == Transaction.GROCERY){
groceryTransactions.add(t);
}
}
Collections.sort(groceryTransactions, new Comparator(){
public int compare(Transaction t1, Transaction t2){
return t2.getValue().compareTo(t1.getValue());
}
});
List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: groceryTransactions){
transactionsIds.add(t.getId());
}
清單2
List<Integer> transactionsIds =
transactions.stream()
.filter(t -> t.getType() == Transaction.GROCERY)
.sorted(comparing(Transaction::getValue).reversed())
.map(Transaction::getId)
.collect(toList());
transactions.stream()
.filter(t -> t.getType() == Transaction.GROCERY)
.sorted(comparing(Transaction::getValue).reversed())
.map(Transaction::getId)
.collect(toList());
圖1形象地解釋了那段Java SE 8程序。首先,我們調用List對象中的Java流()方法從交易列表(數據)中獲取一個Java流對象。然后,多個操作(過濾,排序,映射,歸集)鏈接在一起形成了一條線,這條線可以被看作構成了一條數據查詢。

那么如何并行地執行該程序呢?在Java SE 8中這很簡單:只需要使用parallelJava流()方法去替換Java流()方法,如清單3所示。Java流 API會在內部對你的查詢進行解構,并利用上你機器中的多核處理器。
清單3
List<Integer> transactionsIds =
transactions.parallelStream()
.filter(t -> t.getType() == Transaction.GROCERY)
.sorted(comparing(Transaction::getValue).reversed())
.map(Transaction::getId)
.collect(toList());
transactions.parallelStream()
.filter(t -> t.getType() == Transaction.GROCERY)
.sorted(comparing(Transaction::getValue).reversed())
.map(Transaction::getId)
.collect(toList());
在該關于Java SE 8 Java流的文章系列結束時,你將能夠使用Java流 API編寫出像清單3那樣的功能強大的查詢程序。
Java流入門
讓我們先從一點理論開始。Java流的定義是什么?一個簡短的定義就是"來自于一個數據源的能夠支持聚合操作的一串元素"。讓我們把它拆開來說:
一串元素:Java流為一串特定類型值的集合提供了一個接口。然后,Java流實際上并不存儲元素,它們會在需要時被用上。
數據源:Java流要使用一個提供數據的源,諸如集合對象,數組或I/O資源。
聚合操作:Java流支持類SQL的操作,以及來自于函數編程語言的通用操作,諸如過濾,映射,歸一,查找,匹配,排序,等等。
另外,與集合操作非常不同的是,Java流操作擁有兩項基本特質:
管道:許多Java流操作會返回它們自己,這就使得這些操作能夠鏈接在一起以組成一個大型管道。這樣就可以進行一些諸如惰性和短路之類的優化,后面我們會進行探究。
內部遍歷:集合是顯式地進行遍歷(外部遍歷),但不同于集合,Java流是在幕后進行遍歷。讓我們重新看看之前的示例代碼來解釋這些原理。圖2形象地解釋了清單2的更多細節。

首先,通過調用Java流()方法,我們從交易列表中得到了一個Java流對象。那么數據源就是交易列表,它將向Java流中提供一串元素。然后,我們對該Java流應用了一系列的聚合操作:過濾(提供一個謂語去過濾元素),排序(提供一個比較器去對元素進行排序),以及映射(解析出信息)。所有的操作都會返回該Java流,以便能夠鏈接這些操作去組成一個管道,這可被看作是對數據源的一個查詢。
在調用collect()操作之前,沒有實際工作會被執行。collect()方法將開始處理這個管道以返回一個結果(某個不是Java流的對象,在此處,是一個List對象)。現在還不需要去關注collect()方法,我們會在以后的文章去一探究竟。此時,你會發現collect會將各種數據加工方法作為參數,將收集到的Java流元素歸結為一個結果。此處,toList()就描述了一個將Java流對象轉化為List對象的加工方法。
在探究與Java流有關的各個方法之前,最好是停下來深入思考一下Java流和集合之間觀念上的不同之處。
Java流 vs. 集合
已有的Java集合概念與新的Java流概念都為一串元素提供了接口。那它們有何不同嗎?簡單地說,集合是關于數據的,而Java流是關于計算的。
想想這種情況,一部存儲在DVD中的電影。這就是一個集合(可能是字節,可能是幀--在此處,我們不必關心這些),因為它包含有全部的數據結構。現在再想想這種情況,這部電影被轉化成了數據流,通過互聯網去觀看它。此時它就是一個(字節或幀的)流。流視頻播放器只需要下載一些晚于用戶當前所觀看位置的幀就可以了。這樣,你就可以在大部分值被計算出來之前先展示流開頭處的值(想想流化一場現場直播的足球比賽)。
粗看之,集合與流的區別就是與何時處理數據有關。集合是內存中的數據結構,它包含有當前數據結構中的全部值--將所有元素加入到集合之前,必須先對所有元素進行處理,相反地,Java流只是邏輯上固定的數據結構,它里面的元素只會根據需要進行處理。
使用Collection接口,要求用戶實現遍歷(例如,使用增強的for循環,即foreach);這被稱之為外部循環。相反地,Stream類庫使用內部遍歷--它已經為你實現好了遍歷,它會關心存儲流的結果值的位置;你僅需要提供一個函數告訴它要做些什么就行了。清單4(對集合的外部遍歷)和清單5(對Java流的內部遍歷)中的代碼形象地展示了這一不同之處。
清單4
List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: transactions){
transactionIds.add(t.getId());
}
for(Transaction t: transactions){
transactionIds.add(t.getId());
}
清單5
List<Integer> transactionIds =
transactions.stream()
.map(Transaction::getId)
.collect(toList());
transactions.stream()
.map(Transaction::getId)
.collect(toList());
在清單4中,我們顯式且順序地遍歷了交易列表,抽取了每個交易ID,然后將它加到一個收集器中。相反地,當使用流時,沒有顯式的遍歷。清單5中的代碼構建了一個查詢,其中的map操作被設定為一個參數,它會抽取交易ID,然后collect操作會把結果Stream對象轉化成一個List對象。
你現在應該知道什么是Java流,以及如何去使用它。現在讓我們看看Java流所支持的操作之間的區別,這樣你就能構建自己的數據查詢了。
Java流操作:使用流去處理數據
java.util.stream.Stream接口定義了許多操作,它們可被歸集為兩類。在圖1所示的例子中,你可以看到如下操作:
過濾,排序和映射,它們可被連接在一起組成一個管道
收集,它關閉了這個管道并返回結果
能夠被連接在一起的Java流操作被稱為中間操作。這些操作之所以能被連接在一起,是因為它們都會返回Stream對象。這些操作從這個管道中返回結果,結果的類型可以是List,Integer,甚至是void(任何Stream以外的類型)
你也許很好奇為什么這種區別很重要。是這樣的,在這個Java流管道的最終操作被調用之前,中間操作并不會執行任何處理;它們是"惰性"方法。這是因為中間方法經常會被"合并",在最終操作中它們會被合成為單一的執行路徑。
清單6
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
List<Integer> twoEvenSquares =
numbers.stream()
.filter(n -> {
System.out.println("filtering " + n);
return n % 2 == 0;
})
.map(n -> {
System.out.println("mapping " + n);
return n * n;
})
.limit(2)
.collect(toList());
List<Integer> twoEvenSquares =
numbers.stream()
.filter(n -> {
System.out.println("filtering " + n);
return n % 2 == 0;
})
.map(n -> {
System.out.println("mapping " + n);
return n * n;
})
.limit(2)
.collect(toList());
例如,思考下清單6中的程序,它是從給定的數列中計算奇數的平方。你可能會很驚訝,它打印出如下結果:
filtering 1
filtering 2
mapping 2
filtering 3
filtering 4
mapping 4
filtering 2
mapping 2
filtering 3
filtering 4
mapping 4
這是因為limit(2)使用了短路;我們只需要處理流的一部分,而不是全部,去得到一個結果。這就類似于測評一個由and操作符關聯起來的大型布爾表達式鏈:一旦某個表達式返回了false,那么就可以認為整個表達式鏈就是false,而不必測評所有的表達式了。在這個例子中,limit()方法將返回的Java流的長度限定為2。另外,filter與map被合并在了同一條執行路徑中了。
歸納一下到目前為止,在使用Java流時我們所學到的內容,總言之,涉及三個方面:
一個數據源(例如一個集合),對它執行查詢
一個中間操作的鏈,它組成一個流的管道
一個最終操作,它執行流的管道并產生結果
現在讓我們看看Java流所支持的一些操作。參考java.util.stream.Stream接口可以得到這些方法的完整清單,再看看本文末尾所給出的資源,它包含有更多的例子。
過濾。有多種方法可以對流中的元素進行過濾:
filter(Predicate):使用一個謂語(java.util.function.Predicate)作為參數,它會返回一個包含所有匹配給定謂語條件元素的Java流。
distinct:返回一個包含有唯一元素的Java流。
limit(n):返回的流的長度不能超過n。
skip(n):返回的流將不包括前n個元素。
查找與匹配。一個通用的數據處理模式就要考慮一些元素是否匹配給定的屬性。你可以使用anyMatch,allMatch和noneMatch方法幫你做到這一點。這些方法都會使用一個謂語參數并返回boolean值作為結果(所以,它們是最終操作)。例如,使用allMatch去查出交易流中所有金額大于100的交易,如清單7所示。
清單7
boolean expensive =
transactions.stream()
.allMatch(t -> t.getValue() > 100);
transactions.stream()
.allMatch(t -> t.getValue() > 100);
另外,Stream接口提供了方法findFirst和findAny,以取出流中任一元素。它們可以與其它的流操作,如filter,結合起來使用。findFirst和findAny都會返回一個Optinal對象(見清單8)。
清單8
Optional<Transaction> =
transactions.stream()
.findAny(t -> t.getType() == Transaction.GROCERY);
transactions.stream()
.findAny(t -> t.getType() == Transaction.GROCERY);
Optional<T>類(java.util.Optional)是一個容器類,它代表一個存在或不存在的值。清單8中的程序,findAny方法可能沒有找到任何類型為grocery的交易。Optional類包含多個方法去測試一個元素是否存在。例如,如果交易存在,通過使用ifPresent方法,我們可以選擇一個操作去應用這個Optaional對象,如清單9所示(此處只是打印交易)。
清單9
transactions.stream()
.findAny(t -> t.getType() == Transaction.GROCERY)
.ifPresent(System.out::println);
.findAny(t -> t.getType() == Transaction.GROCERY)
.ifPresent(System.out::println);
映射。Java流支持map方法,它使用一個函數(java.util.function.Function)作為參數,將流元素投影到其它形式。這個函數會被應用到每個元素,并將元素"映射"到新的元素。
例如,你可能會想到使用它去抽取流中每個元素的信息。在清單10的例子中,我們返回了一個列表中每個字的長度。
清單10
List<String> words = Arrays.asList("Oracle", "Java", "Magazine");
List<Integer> wordLengths =
words.stream()
.map(String::length)
.collect(toList());
List<Integer> wordLengths =
words.stream()
.map(String::length)
.collect(toList());
歸一。到目前為止,我們已見過的最終操作會返回boolean(allMatch等等),void(forEach)或Optaional對象(findAny等等)。我們也使用collect方法將Stream對象中的所有元素放到一個List對象中。
然而,你也可以將流中的元素放到一個查詢中,該查詢可表達更為復雜的數據處理,例如"擁有最大ID"或者"算出所以交易金額的和"。這就可能對Java流用上reduce方法,該方法會對每個元素重復地應用一個操作(例如,加上兩個數字),直到生成結果。在函數式編程中,這常被稱為折疊操作。因為該操作可被看作重復地"折疊"一張很長的紙(Stream對象),直到這張紙的面積變得只有一點兒了。這就是折疊操作的結果。
看看我們是如何使用循環去計算一個組數字的和會有助于理解這個問題:
int sum = 0;
for (int x : numbers) {
sum += x;
}
for (int x : numbers) {
sum += x;
}
列表中的每一個數字元素都被迭代地組合在一起,并使用一個額外的操作符去產生結果。本質上,我們就是把一組數字"歸一"成一個數字。在這段代碼中有兩個參數:數字和變量的初始值,即該例中的0,以及用于合并所有元素的操作符,即本例中的+。
清單11
int sum = numbers.stream().reduce(0, (a, b) -> a + b);
對Java流使用reduce方法,我們可以計算出流中的所有元素值之和,如清單11所示。reduce方法使用兩個參數:
初始值,0
BinaryOperation<T>,合并兩個元素,并產生一個新值
reduce方法本質上就是重復應用模式的抽象。其它的查詢,如"計算產量"或"計算最大值"(如清單12所示)則是reduce方法的特別實例。
清單12
int product = numbers.stream().reduce(1, (a, b) -> a * b);
int product = numbers.stream().reduce(1, Integer::max);
int product = numbers.stream().reduce(1, Integer::max);
數字流
你已經看到可以使用reduce方法去計算整數流的和。然后,這也是有成本的:我們重復執行了許多拆箱操作以將Integer對象加到一起。如果我們能調用一個sum方法,使程序的意圖更為明顯,就像清單13那樣,豈不是更好?
清單13
int statement =
transactions.stream()
.map(Transaction::getValue)
.sum(); // error since Stream has no sum method
transactions.stream()
.map(Transaction::getValue)
.sum(); // error since Stream has no sum method
Java 8引入的三個特定的基本數據類型的流接口來應對這個問題--IntStream,DoubleStream和LongStream--它們專注于元素分別為int,double和long型的Java流。將一個流轉化為特定類型的流,你最常使用的方法就是mapToInt,mapToDouble和mapToLong。這些方法與我們較早前看到的map方法是一樣的,但它們會返回特定類型的Stream對象,而不是Stream<T>對象。例如,我們可以改進下清單13中的代碼,如清單14所示那樣。你也可以使用裝箱操作將一個基本數據類型的流轉化成一個使用包裝對象的流。
清單14
int statementSum =
transactions.stream()
.mapToInt(Transaction::getValue)
.sum(); // works!
transactions.stream()
.mapToInt(Transaction::getValue)
.sum(); // works!
最后,數字流的另一種有用的形式是數字區間。比如,你可能想生成介于1到100之間的所有數字。為了幫助生成這種區間,Java SE 8在IntStream,DoubleStream和LongStream中分別引入了兩個靜態方法:range和rangeClosed。
這兩個方法都會使用兩個參數,第一個參數是起始值,第二個參數是終止值。但是range方法生成的區間不會包含終止值本身,但rangeClosed生成的區間則會包含。清單15是一個使用rangeClosed方法的例子,它返回一個包含有全部介于10到30之間奇數的流。
清單15
IntStream oddNumbers =
IntStream.rangeClosed(10, 30)
.filter(n -> n % 2 == 1);
IntStream.rangeClosed(10, 30)
.filter(n -> n % 2 == 1);
構建流
有多種途徑可以去構建一個流。你已經看過如何從集合對象中構建流。另外,我們還操控過數字流。你也可以從值,數組或文件中去創建流。另外,你甚至于可以從一個函數中生成無限流。
可以直截了當地從值或數組中創建流:只需要使用一些靜態方法即可,對于值,是Stream.of();而對于數組,則要調用Arrays.stream()。如清單16所示。
清單16
Stream<Integer> numbersFromValues = Stream.of(1, 2, 3, 4);
int[] numbers = {1, 2, 3, 4};
IntStream numbersFromArray = Arrays.stream(numbers);
int[] numbers = {1, 2, 3, 4};
IntStream numbersFromArray = Arrays.stream(numbers);
你也可以將一個文件轉化為其內容行的流,使用靜態方法Files.lines()即可。清單17就使用該方法計算了文件中行的數量。
清單17
long numberOfLines =
Files.lines(Paths.get(“yourFile.txt”), Charset.defaultCharset())
.count();
Files.lines(Paths.get(“yourFile.txt”), Charset.defaultCharset())
.count();
無限流。最后,在總結本文之前,有一個令人非常興奮的主意。到現在為止,你應該理解到流中的元素是按需生成的。有兩個靜態方法--Stream.iterate()和Stream.generate()--可以讓你從一個函數中創建流。然而,因為被使用的元素是按需生成的,所以這兩個方法可以"永遠地"生成元素。這就是為什么我們稱它為無限流:它就是沒有固定大小的流,但它做的事情與一個從固定集合生成的流是一樣的。
清單18就是一個使用iterate方法的例子,它會包含10的所有倍數。iterate方法使用一個起始值(此處的0)和一個Lambda表達式(類型為UnaryOperator<T>)去順序地生成每一個新值。
清單18
Stream<Integer> numbers = Stream.iterate(0, n -> n + 10);
我們也可以使用limit方法,以從一個無限流中得到一個固定流。如清單19所示,可以將流的長度限制為5。
清單19
numbers.limit(5).forEach(System.out::println); // 0, 10, 20, 30, 40
結論
Java SE 8引入了Streams API,它讓你能夠表達更為復雜的數據處理查詢。在本文中,你已見到流可以支持許多操作,諸如過濾,映射,歸一和迭代,把它們結合在一起可以寫出簡潔的、更富表現力的數據處理查詢。這種新的編程方法遠不同于Java SE 8之前的集合處理。但是,它有許多好處。首先,它利用到了諸如惰性或短路這樣的技術,以優化數據處理查詢的性能。其次,能夠自動地利用上多核架構,以并行地處理流。在本文章系統的第二部分中,我們將探索更高級的操作,例如flatMap和collect。請繼續關注。