在Java中使用協(xié)程(Coroutine)
本篇blog將講述coroutine的一些背景知識,以及在Java中如何使用Coroutine,包括一個簡單的benchmark對比,希望能借助這篇blog讓大家了解到更多在java中使用coroutine的方法,本篇blog的PDF版本可從此下載:http://www.bluedavy.com/open/UseCoroutineInJava.pdf
在講到具體內(nèi)容之前,不能不先講下Coroutine的一些背景知識,來先具體了解下什么是Coroutine。
1.
背景知識
現(xiàn)在的操作系統(tǒng)都是支持多任務(wù)的,多任務(wù)可通過多進程或多線程的方式去實現(xiàn),進程和線程的對比就不在這里說了,在多任務(wù)的調(diào)度上操作系統(tǒng)采取搶占式和協(xié)作式兩種方式,搶占式是指操作系統(tǒng)給每個任務(wù)一定的執(zhí)行時間片,在到達這個時間片后如任務(wù)仍然未釋放對CPU的占用,那么操作系統(tǒng)將強制釋放,這是目前多數(shù)操作系統(tǒng)采取的方式;協(xié)作式是指操作系統(tǒng)按照任務(wù)的順序來分配CPU,每個任務(wù)執(zhí)行過程中除非其主動釋放,否則將一直占據(jù)CPU,這種方式非常值得注意的是一旦有任務(wù)占據(jù)CPU不放,會導致其他任務(wù)”餓死”的現(xiàn)象,因此操作系統(tǒng)確實不太適合采用這種方式。
說完操作系統(tǒng)多任務(wù)的調(diào)度方式后,來看看通常程序是如何實現(xiàn)支持高并發(fā)的,一種就是典型的基于操作系統(tǒng)提供的多進程或多線程機制,每個任務(wù)占據(jù)一個進程或一個線程,當任務(wù)中有IO等待等動作時,則將進程或線程放入待調(diào)度隊列中,這種方式是目前大多數(shù)程序采取的方式,這種方式的壞處在于如想支持高的并發(fā)量,就不得不創(chuàng)建很多的進程或線程,而進程和線程都是要消耗不少系統(tǒng)資源的,另外一方面,進程或線程創(chuàng)建太多后,操作系統(tǒng)需要花費很多的時間在進程或線程的切換上,切換動作需要做狀態(tài)保持和恢復,這也會消耗掉很多的系統(tǒng)資源;另外一種方式則是每個任務(wù)不完全占據(jù)一個進程或線程,當任務(wù)執(zhí)行過程中需要進行IO等待等動作時,任務(wù)則將其所占據(jù)的進程或線程釋放,以便其他任務(wù)使用這個進程或線程,這種方式的好處在于可以減少所需要的原生的進程或線程數(shù),并且由于操作系統(tǒng)不需要做進程或線程的切換,而是自行來實現(xiàn)任務(wù)的切換,其成本會較操作系統(tǒng)切換低,這種方式也就是本文的重點,Coroutine方式,又稱協(xié)程方式,這種方式在目前的大多數(shù)語言中都有支持。
各種語言在實現(xiàn)Coroutine方式的支持時,多數(shù)都采用了Actor Model來實現(xiàn),Actor Model簡單來說就是每個任務(wù)就是一個Actor,Actor之間通過消息傳遞的方式來進行交互,而不采用共享的方式,Actor可以看做是一個輕量級的進程或線程,通常在一臺4G內(nèi)存的機器上,創(chuàng)建幾十萬個Actor是毫無問題的,Actor支持Continuations,即對于如下代碼:
Actor
act方法
進行一些處理
創(chuàng)建并執(zhí)行另外一個Actor
通過消息box阻塞獲取另一個Actor執(zhí)行的結(jié)果
繼續(xù)基于這個結(jié)果進行一些處理
在支持Continuations的情況下,可以做到消息box阻塞時并不是進程或線程級的阻塞,而只是Actor本身的阻塞,并且在阻塞時可將所占據(jù)的進程或線程釋放給其他Actor使用,Actor Model實現(xiàn)最典型的就是erLang了。
對于Java應(yīng)用而言,傳統(tǒng)方式下為了支持高并發(fā),由于一個線程只能用于處理一個請求,即使是線程中其實有很多IO中斷、鎖等待也同樣如此,因此通常的做法是通過啟動很多的線程來支撐高并發(fā),但當線程過多時,就造成了CPU需要消耗不少的時間在線程的切換上,從而出現(xiàn)瓶頸,按照上面對Coroutine的描述,Coroutine的方式理論上而言能夠大幅度的提升Java應(yīng)用所能支撐的并發(fā)量。
2. 在Java中使用Coroutine
Java尚不能從語言層次上支持Coroutine,也許Java 7能夠支持,目前已經(jīng)有了一個測試性質(zhì)的版本[1],在Sun JDK 7尚未正式發(fā)布的情況下如希望在Java中使用Coroutine,Scala或Kilim是可以做的選擇,來分別看下。
Scala是現(xiàn)在很火的語言之一,Twitter消息中間件基于Scala編寫更是讓Scala名聲鵲起,除了在語法方面所做出的改進外,其中一個最突出的特色就是Scala Actor,Scala Actor是Scala用于實現(xiàn)Coroutine的方式,先來具體看看Scala在Coroutine支持實現(xiàn)的關(guān)鍵概念。
l Actor
Scala Actor可以看做是一個輕量級的Java Thread,其使用方式和Java Thread基本也一致,繼承Actor,實現(xiàn)act方法,啟動時也是調(diào)用start方法,但和Java Thread不同的是,Scala Actor可等待外部發(fā)送過來的消息,并進行相應(yīng)的處理。
l Actor的消息發(fā)送機制
發(fā)送消息到Actor的方式有異步、Future兩種方式,異步即指發(fā)送后立即返回,繼續(xù)后續(xù)流程,使用異步發(fā)送的方法為:actor ! MessageObject,其中消息對象可以為任何類型,并且Scala還支持一種稱為case Object的對象,便于在收到消息時做pattern matching。
Future方式是指阻塞線程等待消息處理的結(jié)果,使用Future方式發(fā)送的方法為:actor !! MessageObject,在等待結(jié)果方面,Scala支持不限時等待,限時等待以及等待多個Future或個別Future完成,使用方法如下:
val ft=actor !! MessageObject // Future方式發(fā)送消息
val result=ft() // 不限時等待
val results=awaitAll(500,ft1,ft2,ft3) // 限時等待多個Future返回值
val results=awaitEither(ft1,ft2) // 等待個別future完成
接收消息方通過reply方法返回Future方式所等待的結(jié)果。
l Actor的消息接收機制
當代碼處于Actor的act方法或Actor環(huán)境(例如為Actor的act方法調(diào)用過來的代碼)中時,可通過以下兩種方式來接收外部發(fā)送給Actor的消息:一為receive方式,二為react方式,代碼例子如下:
receive{
case MessageObject(args) => doHandle(args)
}
react{
case MessageObject(args) => doHandle(args)
}
receive和react的差別在于receive需要阻塞當前Java線程,react則僅為阻塞當前Actor,但并不會阻塞Java線程,因此react模式更適合于充分發(fā)揮coroutine帶來的原生線程數(shù)減少的好處,但react模式有個缺點是react不支持返回。
receive和react都有限時接收的方式,方法為:receiveWithin(timeout)、reactWithin(timeout),超時的消息通過case TIMEOUT的方式來接收。
下面來看基于Scala Actor實現(xiàn)并發(fā)處理請求的一個簡單例子。
class Processor extends Actor{
def act(){
loop{
react{
case command:String => doHandle(command)
}
}
}
def doHandle(command:String){
// 業(yè)務(wù)邏輯處理
}
}
當需要并發(fā)執(zhí)行此Processor時,在處理時需要的僅為調(diào)用以下代碼:
val processor=new Processor()
processor.start
processor ! “Hello”
從以上說明來看,要在舊的應(yīng)用中使用Scala還是會有一些成本,部署運行則非常簡單,在Scala IDE Plugin編寫了上面的scala代碼后,即生成了java class文件,可直接在jvm中運行。
Kilim是由劍橋的兩位博士開發(fā)的一個用于在Java中使用Coroutine的框架,Kilim基于Java語法,先來看看Kilim中的關(guān)鍵概念。
l Task
可以認為Task就是Actor,使用方式和Java Thread基本相同,只是繼承的為Task,覆蓋的為execute方法,啟動也是調(diào)用task的start方法。
l Task的消息發(fā)送機制
Kilim中通過Mailbox對象來發(fā)送消息,Mailbox的基本原則為可以有多個消息發(fā)送者,但只能有一個消息接收者,發(fā)送的方式有同步發(fā)送、異步發(fā)送和阻塞線程方式的同步發(fā)送三種,同步發(fā)送是指保證一定能將消息放入發(fā)送隊列中,如當前發(fā)送隊列已滿,則等待到可用為止,阻塞的為當前Task;異步發(fā)送則是嘗試將消息放入發(fā)送隊列一次,如失敗,則返回false,成功則返回true,不會阻塞Task;阻塞線程方式的同步發(fā)送是指阻塞當前線程,并保證將消息發(fā)送給接收者,三種方式的使用方法如下:
mailbox.put(messageObject); // 同步發(fā)送
mailbox.putnb(messageObject); // 異步發(fā)送
mailbox.putb(messageObject); // 阻塞線程方式發(fā)送
l Task的消息接收機制
Kilim中通過Mailbox來接收消息,接收消息的方式有同步接收、異步接收以及阻塞線程方式的同步接收三種,同步接收是指阻塞當前Task,直到接收到消息才返回;異步接收是指立刻返回Mailbox中的消息,有就返回,沒有則返回null;阻塞線程方式的同步接收是指阻塞當前線程,直到接收到消息才返回,使用方法如下:
mailbox.get(); // 同步接收,傳入long參數(shù)表示等待的超時時間,單位為毫秒
mailbox.getnb(); // 異步接收,立刻返回
mailbox.getb(); // 阻塞線程方式接收
下面來看基于Kilim實現(xiàn)并發(fā)處理請求的一個簡單例子。
public class Processor extends Task{
private String command;
public Processor(String command){
this.command=command;
}
public void execute() throws Pausable,Exception{
// 業(yè)務(wù)邏輯處理
}
}
在處理時,僅需調(diào)用以下代碼:
Task processor=new Processor(command);
processor.start();
從以上代碼來看,Kilim對于Java人員而言學習門檻更低,但對于需要采用coroutine方式執(zhí)行的代碼在編譯完畢后,還需要采用Kilim的kilim.tools.Weaver類來對這些已編譯出來的class文件做織入,運行時需要用織入后生成的class文件才行,織入的方法為:java kilim.tools.Weaver –d [織入后生成的class文件存放的目錄] [需要織入的類文件所在的目錄],目前尚沒有Kilim IDE Plugin可用,因此weaver這個過程還是比較的麻煩。
上面對Scala和Kilim做了一個簡單的介紹,在實際Java應(yīng)用中使用Coroutine時,通常會出現(xiàn)以下幾種典型的更復雜的使用場景,由于Actor模式本身就是異步的,因此其天然對異步場景支持的就非常好,更多的問題會出現(xiàn)在以下幾個同步場景上,分別來看看基于Scala、Kilim如何來實現(xiàn)。
l Actor同步調(diào)用
Actor同步調(diào)用是經(jīng)常會出現(xiàn)的使用場景,主要為Actor發(fā)送消息給其他的Actor處理,并等待結(jié)果才能繼續(xù)。
n Scala
對于這種情況,在Scala 2.7.7中,目前可采取的為以下兩種方法:
一種為通過Future方式發(fā)送消息來實現(xiàn):
class Processor(command:String) extends Actor{
def act(){
val actor=new NetSenderActor()
val ft=actor !! command
println(ft())
}
}
class NetSenderActor extends Actor{
def act(){
case command:String => {
reply(“received command:”+command)
}
}
}
第二種為通過receive的方式來實現(xiàn):
class Processor(command:String) extends Actor{
def act(){
val actor=new NetSenderActor()
actor ! command
var senderResult=””
receive{
case result:String => {
senderResult=result
}
}
println(senderResult)
}
}
class NetSenderActor extends Actor{
def act(){
case command:String => {
sender ! (“received command:”+command)
}
}
}
但這兩種方式其實都不好,因為這兩種方式都會造成當前Actor的線程阻塞,這也是因為目前Scala版本對continuations尚不支持的原因,Scala 2.8版本將提供continuations的支持,希望到時能有不需要阻塞Actor線程實現(xiàn)上述需求的方法。
還有一種常見的場景是Actor調(diào)一段普通的Scala類,然后那個類中進行了一些處理,并調(diào)用了其他Actor,此時在該類中如需要等待Actor的返回結(jié)果,也可使用上面兩種方法。
n Kilim
在Kilim中要實現(xiàn)Task之間的同步調(diào)用非常簡單,代碼如下:
public class TaskA extends Task{
public void execute() throws Pausable,Exception{
Mailbox<Object> result=new Mailbox<Object>();
Task task=new TaskB(result);
task.start();
Object resultObject=result.get();
System.out.println(resultObject);
}
}
public class TaskB extends Task{
private Mailbox<Object> result;
public TaskB(Mailbox<Object> result){
this.result=result;
}
public void execute() throws Pausable,Exception{
result.put(“result from TaskB”);
}
}
Kilim的Mailbox.get并不會阻塞線程,因此這種方式是完全滿足需求的。
l 普通Java代碼同步調(diào)用Actor
由于已有的應(yīng)用是普通的Java代碼,經(jīng)常會出現(xiàn)這樣的場景,就是希望實現(xiàn)在這些Java代碼中同步的調(diào)用Actor,并等待Actor的返回結(jié)果,但由于Scala和Kilim都強調(diào)首先必須是在Actor或Task的環(huán)境下才行,因此此場景更佳的方式應(yīng)為Scala Actor(Kilim Task) à Java Code à Scala Actor(Kilim Task),這種場景在對已有的應(yīng)用中會是最常出現(xiàn)的,來看看在Scala和Kilim中如何應(yīng)對這樣的需求。
n Scala
目前Scala中如希望在Java Code中調(diào)用Scala Actor,并等待其返回結(jié)果,暫時還沒辦法,做法只能改為從Java Code中去調(diào)一個Scala的Object,然后在這個Object中調(diào)用Actor,并借助上面提到的receive或future的方法來獲取返回值,最后將這個返回值返回Java Code。
n Kilim
目前Kilim中如希望實現(xiàn)上面的需求,其實非常簡單,只需要在Java Code的方法上加上Throw Pausable,然后通過mailbox.get來等待Kilim Task返回的結(jié)果即可,在Kilim中只要調(diào)用棧上的每個方法都有Throw Pausable,就可在這些方法上做等待返回這類的同步操作。
從上面這兩個最常見的需求來看,無疑Kilim更符合需求,但要注意的是對于Kilim而言,如果出現(xiàn)Task à nonpausable method à pausable method這樣的狀況時,pausable method中如果想執(zhí)行阻塞當前Task的操作,是無法做到的,只能改造成Task (在mailbox上做等待,并傳遞mailbox給后續(xù)步驟) à nonpausable method (傳遞mailbox) à pausable method (將邏輯轉(zhuǎn)為放入一個Task中,并將返回值放入傳遞過來的mailbox),這種狀況在面對spring aop、反射調(diào)用等現(xiàn)象時就會出現(xiàn)了,目前kilim 0.6的版本尚未提供更透明的使用方法,不過據(jù)kilim作者提供的一個試用版本,其中已經(jīng)有了對于反射調(diào)用的透明化的支持,暫時在目前只能采用上述方法,遷移成本相對較大,也許以后的kilim版本會考慮這樣的場景,提供相應(yīng)的方法來降低遷移的成本。
3. 性能、所能支撐的并發(fā)量對比
在對Scala、Kilim有了這些了解后,來具體看看采用Scala、Kilim后與傳統(tǒng)Java方式在性能、所能支撐的并發(fā)量上的對比。
l 測試模型
采用一個比較簡單的模型進行測試,具體為有4個線程,這4個線程分別接收到了一定數(shù)量的請求,每個請求需要交給另外一個線程去執(zhí)行,這個線程所做的動作為循環(huán)10次獲取另外一個線程的執(zhí)行結(jié)果,此執(zhí)行線程所做的動作為循環(huán)1000次拼接一個字符串,然后返回。
l 實現(xiàn)代碼
由于目前Scala版本對Continuation支持不夠好,但上面的場景中又有此類需求,所以導致Scala版本的代碼寫的比較麻煩一些。
實現(xiàn)代碼以及可運行的環(huán)境請從此處下載:
http://www.bluedavy.com/open/benchmark.zip
l 結(jié)果對比
測試機器為一臺4核的linux機器。
TPS的對比結(jié)果如下:
Load的對比結(jié)果如下:
從上面的測試結(jié)果來看,在這個benchmark的場景中,基于Kilim和Scala實現(xiàn)的Coroutine版本在隨著請求數(shù)增長的情況下load的增長幅度都比純粹的Java版本低很多,Kilim版本表現(xiàn)尤其突出,在TPS方面,由于目前Scala版本對Continuation支持的不好,因此在這個測試場景中有點吃虧,表現(xiàn)反而最差,經(jīng)過上面的測試可以看到,基于Coroutine版本可以以同樣的load或更低的load來支撐更高的TPS。
到此為止,基本上對Java中使用Coroutine的相關(guān)知識做了一個介紹,總結(jié)而言,采用Coroutine方式可以很好的繞開需要啟動太多線程來支撐高并發(fā)出現(xiàn)的瓶頸,提高Java應(yīng)用所能支撐的并發(fā)量,但在開發(fā)模式上也會帶來變化,并且需要特別注意不能造成線程被阻塞的現(xiàn)象,從開發(fā)易用和透明遷移現(xiàn)有Java應(yīng)用兩個角度而言目前Coroutine方式還有很多不足,但相信隨著越來越多的人在Java中使用Coroutine,其易用性必然是能夠得到提升的。
4. 參考資料
1. http://en.wikipedia.org/wiki/Computer_multitasking
2. http://en.wikipedia.org/wiki/Coroutine
3. http://en.wikipedia.org/wiki/Actor_model
4. http://en.wikipedia.org/wiki/Continuation
5. http://lamp.epfl.ch/~phaller/doc/haller07coord.pdf
6. http://www.scala-lang.org/sites/default/files/odersky/jmlc06.pdf
posted on 2010-01-28 23:16 BlueDavy 閱讀(25567) 評論(27) 編輯 收藏 所屬分類: Internet