移植jQuery deferred到java,基于java的promise編程模型
很多語言都支持promise編程模型,像是scala中promise類和jquery(javascript)中的deferred對(duì)象等,在java中好像缺少相關(guān)實(shí)現(xiàn)。筆者不得以,只能自己動(dòng)手弄了一個(gè)。最后選擇將jquery中的deferred對(duì)象移植到java中來的方案。目前已經(jīng)應(yīng)用在企業(yè)級(jí)項(xiàng)目的高性能服務(wù)器和android客戶端等項(xiàng)目中。
Promise編程模型的概念這里也不再贅述,大家自己上網(wǎng)查找即可。這種編程模型主要解決的問題就是“同步調(diào)用變異步的問題”,通常解決異步調(diào)用的方式是使用“回調(diào)”。但普通回調(diào)的使用在代碼書寫,返回值傳遞和“異步方法編排上”非常的不方便。所以才會(huì)有Promise模型的誕生。
這次會(huì)介紹java版的deferred對(duì)象的使用方法,以及用jquery版之間的變化和改進(jìn)。目前開放的版本是基于線程池的版本,正在開發(fā)基于akka的版本。在jquery的實(shí)現(xiàn)中,因?yàn)?/span>javascript是單線程的,所以不用考慮線程同步的問題。在java線程池的版的deferred里,基于多線程環(huán)境做了很多測(cè)試,保證了線程安全及可靠性。
一. 基本調(diào)用形式
final Deferred def = new Deferred (App. executor);
執(zhí)行某個(gè)異步調(diào)用,比如某個(gè)基于網(wǎng)絡(luò)的異步服務(wù)
callService(new Response(){
public void onMessage(Object message){
def.resolve(message);
}
Public void onFail(Exception e){
def.reject(e);
}
});
你可以在構(gòu)造Deferred 對(duì)象后的任意時(shí)候,使用def的then方法。比如
def.then(new Reply(){
public Object done(Object d) {
System.out.println("response:"+d);
return d;
}
public void fail(Object f) {
System.out.println("error:"+f);
}
});
一個(gè)經(jīng)常遇到的場(chǎng)景是callService后將def作為參數(shù)傳遞到其他方法,在其他方法內(nèi)部再?zèng)Q定def要綁定什么樣的后續(xù)動(dòng)作,也就是綁定什么樣的then。
注意then方法的定義public Object done(Object d),在實(shí)際使用中done通常是以“處理鏈”的方式來使用的,即你會(huì)看到def.then().then().then()…這樣的方式,每一個(gè)then的done方法接收的參數(shù)都是其上一個(gè)then的done方法的返回值。通常作為參數(shù)傳遞給某個(gè)方法的Deferred上面已經(jīng)綁定了一些默認(rèn)的then對(duì)象,來處理一些必要的步驟。比如對(duì)接收?qǐng)?bào)文的初步解碼。
注意同在Reply接口中fail方法是沒有返回值的,一旦異步處理鏈上的某個(gè)Deferred被reject,其本身及后面所有的Deferred綁定的then都會(huì)被觸發(fā)fail方法。這保證了整個(gè)業(yè)務(wù)編排上或是你精心設(shè)計(jì)的算法編排上任意一個(gè)環(huán)節(jié),無論如何都會(huì)得到響應(yīng),這也是Promise模型關(guān)于異常的最重要的處理方式。
Promise編程模型本身是強(qiáng)健的,但異步服務(wù)卻不是總能得到響應(yīng)。在實(shí)際應(yīng)用中,每一個(gè)作為計(jì)算或業(yè)務(wù)環(huán)節(jié)的Deferred都應(yīng)該被定時(shí)輪詢,以保證在異步服務(wù)徹底得不到響應(yīng)的時(shí)候(比如你執(zhí)行了一個(gè)數(shù)據(jù)庫查詢,但過了很長很長時(shí)間仍沒有得到回應(yīng)),可以給Deferred對(duì)象reject一個(gè)超時(shí)錯(cuò)誤。
響應(yīng)處理對(duì)象then中方法done和fail都是不允許拋出任何異常的,特別是done方法,如果你的算法依賴異常,請(qǐng)?jiān)?/span>done中加上try…catch,并將異常傳換成下一個(gè)then可以理解的信息,以便這個(gè)Deferred處理鏈中可以正常執(zhí)行下去。
二. pipe到另外一個(gè)異步處理流程上去
假如你有如下的業(yè)務(wù)場(chǎng)景,你需要順序調(diào)用三個(gè)異步的webservice服務(wù)來得到最終的返回結(jié)果,其中沒個(gè)webservice的入?yún)⒍己蜕弦粋€(gè)的異步返回結(jié)果相關(guān)。(注意,異步的webservice是調(diào)用之后,服務(wù)端立刻返回,服務(wù)端處理完成后再主動(dòng)訪問剛才的請(qǐng)求方返回結(jié)果的方式)如果將這種webservice調(diào)用封裝成同步方法無疑在編程上是非常方便的,可以使用我們平常寫程序時(shí)順序的書寫方式,比如
reval1 = callwebservice1(param0)
reval2 = callwebservice2(reval1)
reval3 = callwebservice3(reval2)
方便的同時(shí)卻犧牲了性能。調(diào)用線程要在callwebservice方法內(nèi)阻塞,以等待異步返回。這樣的編程方法無法滿足高性能及高并發(fā)的需要。那么有沒有既能類似于平常寫程序時(shí)順序的書寫方式又能滿足異步無阻塞的需要呢,這就是Promise編程模型本身要解決的最大問題。
通常解決這種問題的方式是使用pipe,pipe這個(gè)方法名稱的由來應(yīng)該是來自于linux shell的管道符,即“|”
使用Deferred對(duì)象的解決方案類似于如下:
Deferred.resolvedDeferred(App.executor,param0).pipe(new AsyncRequest2(){
public void apply(Object param0,final Deferred newDefered) throws Exception{
asyncCallwebservice1(param0).onResponse(new Response(){
public void onMessage(String message){
newDefered.resolve(message);
}
});
}
}).pipe(new AsyncRequest2(){
public void apply(Object reval1,final Deferred newDefered) throws Exception{
asyncCallwebservice2(reval1).onResponse(new Response(){
public void onMessage(String message){
newDefered.resolve(message);
}
});
}
}).pipe(new AsyncRequest2(){
public void apply(Object reval2,final Deferred newDefered) throws Exception{
asyncCallwebservice3(reval3).onResponse(new Response(){
public void onMessage(String message){
newDefered.resolve(message);
}
});
}
}).then(new new Reply(){
public Object done(Object d) {
//在這里消費(fèi)最終結(jié)果
return d;
}
public void fail(Object f) {
}
});
使用Deferred對(duì)象提供的方案好處就是,所有的調(diào)用都是異步的,上面這一連串代碼立刻就會(huì)返回。所有的業(yè)務(wù)編排會(huì)按照書寫順序在線程池中的線程里被調(diào)用,你也不必?fù)?dān)心返回值結(jié)果和參數(shù)傳遞過程中的線程安全問題,框架在關(guān)鍵位置都做了同步,也做了相當(dāng)多的測(cè)試用于驗(yàn)證。
可以看出,對(duì)于異步方法調(diào)用而言,比較難以解決的問題是異步算法的編排問題。Deferred對(duì)象為異步算法提供了很好的解決方案。
相較于AsyncRequest2類還有一個(gè)AsyncRequest1類,接口如下:
public interface AsyncRequest1<R> {
public Deferred apply(R result) throws Exception;
}
這個(gè)類要求在在apply方法中要自己創(chuàng)建Deferred對(duì)象。
三. 一些小改進(jìn)
相較于傳統(tǒng)promise編程模型,在java多線程環(huán)境下做了一些小升級(jí)。這里主要介紹synchronize方法
Synchronize方法簽名如下:
Deferred synchronize(ExecutorService executor,Deferred... deferreds)
實(shí)際上,synchronize方法將眾多的Deferred對(duì)象的完成狀態(tài)同歸集到一個(gè)唯一的Deferred對(duì)象上去,即如果所有的Deferred對(duì)象參數(shù)都resolved了,作為最終結(jié)果的Deferred也resolve,如果眾多的Deferred對(duì)象參數(shù)有一個(gè)reject了,最終的那個(gè)Deferred也會(huì)立即reject(其他參數(shù)的狀態(tài)都舍棄)。
這個(gè)方法一般用于多個(gè)并行流程最終狀態(tài)的“歸并”中。
除了synchronize,框架還提供一些傳統(tǒng)promise編程模型沒有的改進(jìn),比如pipe4fail和source等。
四.在android項(xiàng)目中的應(yīng)用
(略)
https://github.com/jonenine/javaDeferred
https://github.com/jonenine/HST
雖然大數(shù)據(jù)的發(fā)展已經(jīng)將近10個(gè)年頭了,hadoop技術(shù)仍然沒有過時(shí),特別是一些低成本,入門級(jí)的小項(xiàng)目,使用hadoop還是蠻不錯(cuò)的。而且,也不是每一個(gè)公司都有能力招聘和培養(yǎng)自己的spark人才。
我本人對(duì)于hadoop mapreduce是有一些意見的。hadoop mapreduce技術(shù)對(duì)于開發(fā)人員的友好度不高,程序難寫,調(diào)試?yán)щy,對(duì)于復(fù)雜的業(yè)務(wù)邏輯遠(yuǎn)沒有spark得心應(yīng)手。
2016年的春節(jié)前接到一個(gè)任務(wù),要在一個(gè)沒有spark的平臺(tái)實(shí)現(xiàn)電力系統(tǒng)的一些統(tǒng)計(jì)分析算法,可選的技術(shù)只有hadoop mapreduce。受了這個(gè)刺激之后產(chǎn)生了一些奇思妙想,然后做了一些試驗(yàn),并最終形成HST---hadoop simplize toolkit,還真是無心載柳柳成蔭啊。
HST基本優(yōu)點(diǎn)如下:
屏蔽了hadoop數(shù)據(jù)類型,取消了driver,將mapper和reducer轉(zhuǎn)化為transformer和joiner,業(yè)務(wù)邏輯更接近sql。相當(dāng)程度的減少了代碼量,極大的降低了大數(shù)據(jù)編程的門檻,讓基層程序員通過簡單的學(xué)習(xí)即可掌握大數(shù)據(jù)的開發(fā)。
克服了hadoop mapreduce數(shù)據(jù)源單一的情況,比如在一個(gè)job內(nèi),input可以同時(shí)讀文件和來自不同集群的hbase。
遠(yuǎn)程日志系統(tǒng),讓mapper和reducer的日志集中到driver的控制臺(tái),極大減輕了并行多進(jìn)程程序的調(diào)試難度。
克服了hadoop mapreduce編寫業(yè)務(wù)邏輯時(shí),不容易區(qū)分?jǐn)?shù)據(jù)來自哪個(gè)數(shù)據(jù)源的困難。接近了spark(或者sql)的水平。
天生的多線程執(zhí)行,即在mapper和reducer端都默認(rèn)使用多線程來執(zhí)行業(yè)務(wù)邏輯。
對(duì)于多次迭代的任務(wù),相連的兩個(gè)任務(wù)可以建立關(guān)聯(lián),下一個(gè)任務(wù)直接引用上一個(gè)任務(wù)的結(jié)果,使多次迭代任務(wù)的代碼結(jié)構(gòu)變得清晰優(yōu)美。
以下會(huì)逐條說明
基本概念的小變化:
Source類代替了hadoop Input體系(format,split和reader)
Transformer代替了mapper
Joiner代替了Reducer
去掉了飽受詬病的Driver,改為內(nèi)置的實(shí)現(xiàn),現(xiàn)在完全不用操心了。
1. 基本上,屏蔽了hadoop的數(shù)據(jù)類型,使用純java類型
在原生的hadoop mapreduce開發(fā)中,使用org.apache.hadoop.io包下的各種hadoop數(shù)據(jù)類型,比如hadoop的Text類型,算法的編寫中一些轉(zhuǎn)換非常不方便。而在HST中一律使用java基本類型,完全屏蔽了hadoop類型體系。
比如在hbase作為source(Input)的時(shí)候,再也不用直接使用ImmutableBytesWritable和Result了,HST為你做了自動(dòng)的轉(zhuǎn)換。
現(xiàn)在的mapper(改名叫Transformer了)風(fēng)格是這樣的
public static class TransformerForHBase0 extends HBaseTransformer<Long>
…
現(xiàn)在map方法叫flatmap,看到?jīng)],已經(jīng)幫你自動(dòng)轉(zhuǎn)成了string和map
public void flatMap(String key, Map<String, String> row,
Collector<Long> collector)
可閱讀xs.hadoop.iterated.IteratedUtil類中關(guān)于類型自動(dòng)轉(zhuǎn)換的部分
2. 克服了hadoop mapreduce數(shù)據(jù)源單一的情況。比如在一個(gè)job內(nèi),數(shù)據(jù)源同時(shí)讀文件和hbase,這在原生的hadoop mapreduce是不可能做到的
以前訪問hbase,需要使用org.apache.hadoop.hbase.client.Scan和TableMapReduceUtil,現(xiàn)在完全改為與spark相似的方式。
現(xiàn)在的風(fēng)格是這樣的:
Configuration conf0 = HBaseConfiguration.create();
conf0.set("hbase.zookeeper.property.clientPort", "2181");
conf0.set("hbase.zookeeper.quorum", "172.16.144.132,172.16.144.134,172.16.144.136");
conf0.set(TableInputFormat.INPUT_TABLE,"APPLICATION_JOBS");
conf0.set(TableInputFormat.SCAN_COLUMN_FAMILY,"cf");
conf0.set(TableInputFormat.SCAN_CACHEBLOCKS,"false");
conf0.set(TableInputFormat.SCAN_BATCHSIZE,"20000");
...其他hbase的Configuration,可以來自不同集群。
IteratedJob<Long> iJob = scheduler.createJob("testJob")
.from(Source.hBase(conf0), TransformerForHBase0.class)
.from(Source.hBase(conf1), TransformerForHBase1.class)
.from(Source.textFile("file:///home/cdh/0.txt"),Transformer0.class)
.join(JoinerHBase.class)
Hadoop中的input,現(xiàn)在完全由source類來代替。通過內(nèi)置的機(jī)制轉(zhuǎn)化為inputformat,inputsplit和reader。在HST的框架下,其實(shí)可以很容易的寫出諸如Source.dbms(),Source.kafka()以及Source.redis()方法。想想吧,在一個(gè)hadoop job中,你終于可以將任意數(shù)據(jù)源,例如來自不同集群的HBASE和來自數(shù)據(jù)庫的source進(jìn)行join了,這是多么happy的事情啊!
3. 遠(yuǎn)程日志系統(tǒng)。讓mapper和reducer的日志集中在driver進(jìn)行顯示,極大減輕了了并行多進(jìn)程程序的調(diào)試難度
各位都體驗(yàn)過,job fail后到控制臺(tái)頁面,甚至ssh到計(jì)算節(jié)點(diǎn)去查看日志的痛苦了吧。對(duì),hadoop原生的開發(fā),調(diào)試很痛苦的呢!
現(xiàn)在好了,有遠(yuǎn)程日志系統(tǒng),可以在調(diào)試時(shí)將mapper和reducer的日志集中在driver上,錯(cuò)誤和各種counter也會(huì)自動(dòng)發(fā)送到driver上,并實(shí)時(shí)顯示在你的控制臺(tái)上。如果在eclipse中調(diào)試程序,就可以實(shí)現(xiàn)點(diǎn)擊console中的錯(cuò)誤,直接跳到錯(cuò)誤代碼行的功能嘍!
Ps:有人可能會(huì)問,如何在集群外使用eclipse調(diào)試一個(gè)job,卻可以以集群方式運(yùn)行呢?這里不再贅述了,網(wǎng)上有很多答案的哦
4. 克服了hadoop mapreduce在join上,區(qū)分?jǐn)?shù)據(jù)來自哪個(gè)數(shù)據(jù)源的困難,接近spark(或者sql)的水平
在上面給出示例中,大家都看到了,現(xiàn)在的mapper可以綁定input嘍!,也就是每個(gè)input都有自己獨(dú)立的mapper。正因?yàn)榇耍F(xiàn)在的input和mapper改名叫Source和Transformer。
那么,大家又要問了,在mapper中,我已經(jīng)可以輕松根據(jù)不同的數(shù)據(jù)輸入寫出不同的mapper了,那reducer中怎么辦,spark和sql都是很容易實(shí)現(xiàn)的哦?比如看人家sql
Select a.id,b.name from A a,B b where a.id = b.id
多么輕松愉悅啊!
在原生hadoop mapreduce中,在reducer中找出哪個(gè)數(shù)據(jù)對(duì)應(yīng)來自哪個(gè)input可是一個(gè)令人抓狂的問題呢!
現(xiàn)在這個(gè)問題已經(jīng)被輕松解決嘍!看下面這個(gè)joiner,對(duì)應(yīng)原生的reducer
public static class Joiner0 extends Joiner<Long, String, String>
…
Reduce方法改名叫join方法,是不是更貼近sql的概念呢?
public void join(Long key,RowHandler handler,Collector collector) throws Exception{
List<Object> row = handler.getSingleFieldRows(0);//對(duì)應(yīng)索引為0的source
List<Object> row2 = handler.getSingleFieldRows(1);//對(duì)應(yīng)第二個(gè)定義的source
注意上面兩句,可以按照數(shù)據(jù)源定義的索引來取出來自不同數(shù)據(jù)源join后的數(shù)據(jù)了,以后有時(shí)間可能會(huì)改成按照別名來取出,大家看源碼的時(shí)候,會(huì)發(fā)現(xiàn)別名這個(gè)部分的接口都寫好了,要不你來幫助實(shí)現(xiàn)了吧。
5. 天生的多線程執(zhí)行,即在mapper和reducer端都默認(rèn)使用多線程來執(zhí)行業(yè)務(wù)邏輯。
看看源碼吧,HST框架是并發(fā)調(diào)用flatMap和join方法的,同時(shí)又不能改變系統(tǒng)調(diào)用reduce方法的順序(否則hadoop的辛苦排序可就白瞎了),這可不是一件容易的事呢!
看到這里,有的同學(xué)說了。你這個(gè)HST好是好,但你搞的自動(dòng)轉(zhuǎn)換類型這個(gè)機(jī)制可能會(huì)把性能拉下來的。這個(gè)嗎,不得不承認(rèn),可能是會(huì)有一點(diǎn)影響。但在生產(chǎn)環(huán)境做的比對(duì)可以證明,影響太小了,基本忽略不計(jì)。
筆者在生產(chǎn)環(huán)境做了做了多次試驗(yàn),mapper改成多線程后性能并未有提高,特別是對(duì)一些業(yè)務(wù)簡單的job,增加Transformer中的并發(fā)級(jí)別效率可能還會(huì)下降。
很多同學(xué)喜歡在mapper中做所謂“mapper端的join”。這種方式,相信在HST中通過提高mapper的并發(fā)級(jí)別后會(huì)有更好的表現(xiàn)。
Reducer中的性能相對(duì)原生提升的空間還是蠻大的。大部分的mapreduce項(xiàng)目,都是mapper簡單而reducer復(fù)雜,HST采用并發(fā)執(zhí)行join的方式對(duì)提升reducer性能是超好的。
6. 對(duì)于多次迭代的任務(wù),相連的兩個(gè)任務(wù)可以建立關(guān)聯(lián),在流程上的下一個(gè)job直接引用上一個(gè)job的結(jié)果,使多次迭代任務(wù)的代碼結(jié)構(gòu)變得清晰優(yōu)美
雖然在最后才提到這一點(diǎn),但這卻是我一開始想要寫HST原因。多次迭代的任務(wù)太麻煩了,上一個(gè)任務(wù)要寫在hdfs做存儲(chǔ),下一個(gè)任務(wù)再取出使用,麻煩不麻煩。如果都由程序自動(dòng)完成,豈不美哉!
在上一個(gè)任務(wù)里format一下
IteratedJob<Long> iJob = scheduler.createJob("testJob")
...//各種source定義
.format("f1","f2")
在第二個(gè)任務(wù)中,直接引用
IteratedJob<Long> stage2Job = scheduler.createJob("stage2Job")
.fromPrevious(iJob, Transformer2_0.class);
//Transformer2_0.class
public static class Transformer2_0 extends PreviousResultTransformer<Long>
...
public void flatMap(Long inputKey, String[] inputValues,Collector<Long> collector) {
String f1 = getFiledValue(inputValues, "f1");
String f2 = getFiledValue(inputValues, "f2");
看到?jīng)],就是這么簡單。
在最開始的計(jì)劃中,我還設(shè)計(jì)了使用redis隊(duì)列來緩沖前面job的結(jié)果,供后面的job作為輸入。這樣本來必須嚴(yán)格串行的job可以在一定程度上并發(fā)。另外還設(shè)計(jì)了子任務(wù)的并發(fā)調(diào)度,這都留給以后去實(shí)現(xiàn)吧。
7. 便捷的自定義參數(shù)傳遞。
有時(shí)候,在業(yè)務(wù)中需要作一些“開關(guān)變量”,在運(yùn)行時(shí)動(dòng)態(tài)傳入不同的值以實(shí)現(xiàn)不同的業(yè)務(wù)邏輯。這個(gè)問題HST框架其實(shí)也為你考慮到了。
Driver中的自定義參數(shù),source中的自定義參數(shù)都會(huì)以內(nèi)置的方式傳到transformer或joiner中去,方便程序員書寫業(yè)務(wù)。
查看transformer或joiner的源碼就會(huì)發(fā)現(xiàn):
getSourceParam(name)和getDriverParam(pIndex)方法,在計(jì)算節(jié)點(diǎn)輕松的得到在driver和source中設(shè)置的各層次級(jí)別的自定義參數(shù),爽吧!
8. 其他工具
HST提供的方便還不止以上這些,比如在工具類中還提供了兩行數(shù)據(jù)(map類型)直接join的方法。這些都留給你自己去發(fā)現(xiàn)并實(shí)踐吧!
https://github.com/jonenine/HST