Ruby Fiber指南(五): 實現(xiàn)Actor,兼談Erlang的process調(diào)度
Posted on 2010-04-13 18:31 dennis 閱讀(5642) 評論(1) 編輯 收藏 所屬分類: 動態(tài)語言 、erlang 、my open-sourceRuby Fiber指南(一)基礎(chǔ)
Ruby Fiber指南(二)參數(shù)傳遞
Ruby Fiber指南(三)過濾器
Ruby Fiber指南(四)迭代器
Ruby Actor指南(五)實現(xiàn)Actor
寫這個指南的時候,計劃是第五章寫一個Fiber的應(yīng)用例子,但是一時沒有想到比較好的例子,模仿《Programming in Lua》中的多任務(wù)下載的例子也不合適,因為Ruby中的異步HttpClient跟lua還是很不一樣的,體現(xiàn)不了Fiber的優(yōu)點。因此,這第五節(jié)一直拖著沒寫。
恰巧最近在小組中做了一次Erlang的分享,有人問到Erlang調(diào)度器的實現(xiàn)問題,這塊我沒注意過,那時候就根據(jù)我對coroutine實現(xiàn)actor的想法做了下解釋,后來思考了下那個解釋是錯誤的,Erlang的調(diào)度器是搶占式的,而通過coroutine實現(xiàn)的actor調(diào)度卻是非搶占的,兩者還是截然不同的。我在《Actor、Coroutine和Continuation的概念澄清》中提到coroutine可以實現(xiàn)actor風(fēng)格,actor跟coroutine并沒有必然的聯(lián)系,這篇文章的目的就在于證明這一點,使用Ruby Fiber實現(xiàn)一個簡單的actor風(fēng)格的庫,整個代碼不到100行。后面還會談到這個實現(xiàn)的缺點,以及我對Erlang調(diào)度器實現(xiàn)的理解。
首先是monkey patch,給Thread和Fiber類加上兩個方法,分別用于獲取當(dāng)前線程的調(diào)度器和Fiber對應(yīng)的actor:
class Thread
#得到當(dāng)前線程的調(diào)度器
def __scheduler__
@internal_scheduler||=FiberActor::Scheduler.new
end
end
class Fiber
#得到當(dāng)前Fiber的actor
def __actor__
@internal_actor
end
end
#得到當(dāng)前線程的調(diào)度器
def __scheduler__
@internal_scheduler||=FiberActor::Scheduler.new
end
end
class Fiber
#得到當(dāng)前Fiber的actor
def __actor__
@internal_actor
end
end
這里實現(xiàn)的actor仍然是Thread內(nèi)的,一個Thread只跑一個調(diào)度器,每個actor關(guān)聯(lián)一個Fiber。
讓我們來想想調(diào)度器該怎么實現(xiàn),調(diào)度器顧名思義就是協(xié)調(diào)actor的運行,每次挑選適當(dāng)?shù)腶ctor并執(zhí)行,可以想象調(diào)度器內(nèi)部應(yīng)該維護一個等待調(diào)度的actor隊列,Scheduler每次從隊列里取出一個actor并執(zhí)行,執(zhí)行完之后取下一個actor執(zhí)行,不斷循環(huán)持續(xù)這個過程;在沒有actor可以調(diào)度的時候,調(diào)度器應(yīng)該讓出執(zhí)行權(quán)。因此調(diào)度器本身也是一個Fiber,它內(nèi)部有個queue,用于維護等待調(diào)度的actor:
module FiberActor
class Scheduler
def initialize
@queue=[]
@running=false
end
def run
return if @running
@running=true
while true
#取出隊列中的actor并執(zhí)行
while actor=@queue.shift
begin
actor.fiber.resume
rescue => ex
puts "actor resume error,#{ex}"
end
end
#沒有任務(wù),讓出執(zhí)行權(quán)
Fiber.yield
end
end
def reschedule
if @running
#已經(jīng)啟動,只是被掛起,那么再次執(zhí)行
@fiber.resume
else
#將當(dāng)前actor加入隊列
self << Actor.current
end
end
def running?
@running
end
def <<(actor)
#將actor加入等待隊列
@queue << actor unless @queue.last == actor
#啟動調(diào)度器
unless @running
@queue << Actor.current
@fiber=Fiber.new { run }
@fiber.resume
end
end
end
end
class Scheduler
def initialize
@queue=[]
@running=false
end
def run
return if @running
@running=true
while true
#取出隊列中的actor并執(zhí)行
while actor=@queue.shift
begin
actor.fiber.resume
rescue => ex
puts "actor resume error,#{ex}"
end
end
#沒有任務(wù),讓出執(zhí)行權(quán)
Fiber.yield
end
end
def reschedule
if @running
#已經(jīng)啟動,只是被掛起,那么再次執(zhí)行
@fiber.resume
else
#將當(dāng)前actor加入隊列
self << Actor.current
end
end
def running?
@running
end
def <<(actor)
#將actor加入等待隊列
@queue << actor unless @queue.last == actor
#啟動調(diào)度器
unless @running
@queue << Actor.current
@fiber=Fiber.new { run }
@fiber.resume
end
end
end
end
run方法是核心的調(diào)度方法,注釋說明了主要的工作流程。因為調(diào)度器可能讓出執(zhí)行權(quán),因此提供了reschedule方法重新resume啟動調(diào)度器。<<方法用于將等待被調(diào)度的actor加入等待隊列,如果調(diào)度器沒有啟動,那么就啟動調(diào)度Fiber。
有了調(diào)度器,Actor的實現(xiàn)也很簡單,Actor跟Fiber是一對一的關(guān)系,Actor內(nèi)部維護一個mailbox,用來存儲接收到的消息。最重要的是receive原語的實現(xiàn),我們這里很簡單,不搞模式匹配,只是接收消息。receive的工作流程大概是這樣,判斷mailbox中有沒有消息,有消息的話,取出消息并調(diào)用block處理,沒有消息的話就yield讓出執(zhí)行權(quán)。
module FiberActor
class Actor
attr_accessor :fiber
#定義類方法
class << self
def scheduler
Thread.current.__scheduler__
end
def current
Fiber.current.__actor__
end
#啟動一個actor
def spawn(*args,&block)
fiber=Fiber.new do
block.call(args)
end
actor=new(fiber)
fiber.instance_variable_set :@internal_actor,actor
scheduler << actor
actor
end
def receive(&block)
current.receive(&block)
end
end
def initialize(fiber)
@mailbox=[]
@fiber=fiber
end
#給actor發(fā)送消息
def << (msg)
@mailbox << msg
#加入調(diào)度隊列
Actor.scheduler << self
end
def receive(&block)
#沒有消息的時候,讓出執(zhí)行權(quán)
Fiber.yield while @mailbox.empty?
msg=@mailbox.shift
block.call(msg)
end
def alive?
@fiber.alive?
end
end
end
class Actor
attr_accessor :fiber
#定義類方法
class << self
def scheduler
Thread.current.__scheduler__
end
def current
Fiber.current.__actor__
end
#啟動一個actor
def spawn(*args,&block)
fiber=Fiber.new do
block.call(args)
end
actor=new(fiber)
fiber.instance_variable_set :@internal_actor,actor
scheduler << actor
actor
end
def receive(&block)
current.receive(&block)
end
end
def initialize(fiber)
@mailbox=[]
@fiber=fiber
end
#給actor發(fā)送消息
def << (msg)
@mailbox << msg
#加入調(diào)度隊列
Actor.scheduler << self
end
def receive(&block)
#沒有消息的時候,讓出執(zhí)行權(quán)
Fiber.yield while @mailbox.empty?
msg=@mailbox.shift
block.call(msg)
end
def alive?
@fiber.alive?
end
end
end
Actor.spawn用于啟動一個actor,內(nèi)部其實是創(chuàng)建了一個fiber并包裝成actor給用戶,每個actor一被創(chuàng)建就加入調(diào)度器的等待隊列。<<方法用于向actor傳遞消息,傳遞消息后,該actor也將加入等待隊列,等待被調(diào)度。
我們的簡化版actor庫已經(jīng)寫完了,可以嘗試寫幾個例子,最簡單的hello world:
include FiberActor
Actor.spawn { puts "hello world!"}
輸出:Actor.spawn { puts "hello world!"}
hello world!
沒有問題,那么試試傳遞消息:
actor=Actor.spawn{
Actor.receive{ |msg| puts "receive #{msg}"}
}
actor << :test_message
輸出:Actor.receive{ |msg| puts "receive #{msg}"}
}
actor << :test_message
receive test_message
也成了,那么試試兩個actor互相傳遞消息,乒乓一下下:
pong=Actor.spawn do
Actor.receive do |ping|
#收到ping,返回pong
ping << :pong
end
end
ping=Actor.spawn do
#ping一下,將ping作為消息傳遞
pong << Actor.current
Actor.receive do |msg|
#接收到pong
puts "ping #{msg}"
end
end
#resume調(diào)度器
Actor.scheduler.reschedule
Actor.receive do |ping|
#收到ping,返回pong
ping << :pong
end
end
ping=Actor.spawn do
#ping一下,將ping作為消息傳遞
pong << Actor.current
Actor.receive do |msg|
#接收到pong
puts "ping #{msg}"
end
end
#resume調(diào)度器
Actor.scheduler.reschedule
輸出:
ping pong
都沒有問題,這個超級簡單actor基本完成了。可以看到,利用coroutine來實現(xiàn)actor是完全可行的,事實上我這里描述的實現(xiàn)基本上是revactor這個庫的實現(xiàn)原理。revactor是一個ruby的actor庫,它的實現(xiàn)就是基于Fiber,并且支持消息的模式匹配和thread之間的actor調(diào)度,有興趣地可以去玩下。更進一步,其實采用輕量級協(xié)程來模擬actor風(fēng)格早就不是新鮮主意,比如在cn-erlounge的第四次會議上就有兩個topic是關(guān)于這個,一個是51.com利用基于ucontext的實現(xiàn)的類erlang進程模型,一個是許世偉的CERL。可以想見,他們的基本原理跟本文所描述不會有太大差別,那么面對的問題也是一樣。
采用coroutine實現(xiàn)actor的主要缺點如下:
1、因為是非搶占式,這就要求actor不能有阻塞操作,任何阻塞操作都需要異步化。IO可以使用異步IO,沒有os原生支持的就需要利用線程池,基本上是一個重復(fù)造輪子的過程。
2、異常的隔離,某個actor的異常不能影響到調(diào)度器的運轉(zhuǎn),簡單的try...catch是不夠的。
3、多核的利用,調(diào)度器只能跑在一個線程上,無法充分利用多核優(yōu)勢。
4、效率因素,在actor數(shù)量劇增的情況下,簡單的FIFO的調(diào)度策略效率是個瓶頸,盡管coroutine的切換已經(jīng)非常高效。
當(dāng)然,上面提到的這些問題并非無法解決,例如可以使用多線程多個調(diào)度器,類似erlang smp那樣來解決單個調(diào)度器的問題。但是如調(diào)度效率這樣的問題是很難解決的。相反,erlang的actor實現(xiàn)就不是通過coroutine,而是自己實現(xiàn)一套類似os的調(diào)度程序。
首先明確一點,Erlang的process的調(diào)度是搶占式的,而非couroutine的協(xié)作式的。其次,Erlang早期版本是只有一個調(diào)度器,運行在一個線程上,隨著erts的發(fā)展,現(xiàn)在erlang的調(diào)度器已經(jīng)支持smp,每個cpu關(guān)聯(lián)一個調(diào)度器,并且可以明確指定哪個調(diào)度器綁定到哪個cpu上。第三,Erlang的調(diào)度也是采用優(yōu)先隊列+時間片輪詢的方式,每個調(diào)度器關(guān)聯(lián)一個ErtsRunQueue
從上面的描述可以看出,Erlang優(yōu)秀的地方不僅在于actor風(fēng)格的輕量級process,另一個強悍的地方就是它的類os的調(diào)度器,再加上OTP庫的完美支持,這不是一般方案能山寨的。