莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理
             
              Ruby Fiber指南(一)基礎
              Ruby Fiber指南(二)參數傳遞
              Ruby Fiber指南(三)過濾器
              Ruby Fiber指南(四)迭代器
              Ruby Actor指南(五)實現Actor

              寫這個指南的時候,計劃是第五章寫一個Fiber的應用例子,但是一時沒有想到比較好的例子,模仿《Programming in Lua》中的多任務下載的例子也不合適,因為Ruby中的異步HttpClient跟lua還是很不一樣的,體現不了Fiber的優點。因此,這第五節一直拖著沒寫。
              恰巧最近在小組中做了一次Erlang的分享,有人問到Erlang調度器的實現問題,這塊我沒注意過,那時候就根據我對coroutine實現actor的想法做了下解釋,后來思考了下那個解釋是錯誤的,Erlang的調度器是搶占式的,而通過coroutine實現的actor調度卻是非搶占的,兩者還是截然不同的。我在《Actor、Coroutine和Continuation的概念澄清》中提到coroutine可以實現actor風格,actor跟coroutine并沒有必然的聯系,這篇文章的目的就在于證明這一點,使用Ruby Fiber實現一個簡單的actor風格的庫,整個代碼不到100行。后面還會談到這個實現的缺點,以及我對Erlang調度器實現的理解。

              首先是monkey patch,給Thread和Fiber類加上兩個方法,分別用于獲取當前線程的調度器和Fiber對應的actor:
          class Thread
            
          #得到當前線程的調度器
            def __scheduler__
              @internal_scheduler
          ||=FiberActor::Scheduler.new
            end
          end

          class Fiber
            
          #得到當前Fiber的actor
            def __actor__
              @internal_actor
            end
          end

               這里實現的actor仍然是Thread內的,一個Thread只跑一個調度器,每個actor關聯一個Fiber。
               讓我們來想想調度器該怎么實現,調度器顧名思義就是協調actor的運行,每次挑選適當的actor并執行,可以想象調度器內部應該維護一個等待調度的actor隊列,Scheduler每次從隊列里取出一個actor并執行,執行完之后取下一個actor執行,不斷循環持續這個過程;在沒有actor可以調度的時候,調度器應該讓出執行權。因此調度器本身也是一個Fiber,它內部有個queue,用于維護等待調度的actor:
          module FiberActor
            
          class Scheduler
              
          def initialize
                @queue
          =[]
                @running
          =false
              end

              
          def run
                
          return if @running
                @running
          =true
                
          while true
                  
          #取出隊列中的actor并執行
                  while actor=@queue.shift
                    begin
                      actor.fiber.resume
                    rescue 
          => ex
                      puts 
          "actor resume error,#{ex}"
                    end
                  end
                  
          #沒有任務,讓出執行權
                  Fiber.yield
                end
              end

              
          def reschedule
                
          if @running
                  
          #已經啟動,只是被掛起,那么再次執行
                  @fiber.resume
                
          else
                  
          #將當前actor加入隊列
                  self << Actor.current
                end
              end

              
          def running?
                @running
              end

              
          def <<(actor)
                
          #將actor加入等待隊列
                @queue << actor unless @queue.last == actor
                
          #啟動調度器
                unless @running
                   @queue 
          << Actor.current
                   @fiber
          =Fiber.new { run }
                   @fiber.resume
                end
              end
            end
          end

              run方法是核心的調度方法,注釋說明了主要的工作流程。因為調度器可能讓出執行權,因此提供了reschedule方法重新resume啟動調度器。<<方法用于將等待被調度的actor加入等待隊列,如果調度器沒有啟動,那么就啟動調度Fiber。

              有了調度器,Actor的實現也很簡單,Actor跟Fiber是一對一的關系,Actor內部維護一個mailbox,用來存儲接收到的消息。最重要的是receive原語的實現,我們這里很簡單,不搞模式匹配,只是接收消息。receive的工作流程大概是這樣,判斷mailbox中有沒有消息,有消息的話,取出消息并調用block處理,沒有消息的話就yield讓出執行權。

          module FiberActor  
            
          class Actor
              attr_accessor :fiber
              
          #定義類方法
              class << self
                
          def scheduler
                  Thread.current.
          __scheduler__
                end

                
          def current
                  Fiber.current.
          __actor__
                end

                
          #啟動一個actor
                def spawn(*args,&block)
                  fiber
          =Fiber.new do
                     block.call(args)
                  end
                  actor
          =new(fiber)
                  fiber.instance_variable_set :@internal_actor,actor
                  scheduler 
          << actor
                  actor
                end

                
          def receive(&block)
                  current.receive(
          &block)
                end
              end

              
          def initialize(fiber)
                 @mailbox
          =[]
                 @fiber
          =fiber
              end

              
          #給actor發送消息
              def << (msg)
                @mailbox 
          << msg
                
          #加入調度隊列
                Actor.scheduler << self
              end

              
          def receive(&block)
                
          #沒有消息的時候,讓出執行權
                Fiber.yield while @mailbox.empty?
                msg
          =@mailbox.shift
                block.call(msg)
              end

              
          def alive?
                @fiber.alive?
              end
            end

          end

              Actor.spawn用于啟動一個actor,內部其實是創建了一個fiber并包裝成actor給用戶,每個actor一被創建就加入調度器的等待隊列。<<方法用于向actor傳遞消息,傳遞消息后,該actor也將加入等待隊列,等待被調度。

              我們的簡化版actor庫已經寫完了,可以嘗試寫幾個例子,最簡單的hello world:
          include FiberActor

          Actor.spawn { puts 
          "hello world!"}
               輸出:
          hello world!

              沒有問題,那么試試傳遞消息:
          actor=Actor.spawn{
             Actor.receive{ 
          |msg|  puts "receive #{msg}"}
          }
          actor 
          << :test_message
              輸出:
          receive test_message
              
              也成了,那么試試兩個actor互相傳遞消息,乒乓一下下:
          pong=Actor.spawn do
                Actor.receive do 
          |ping|
                  
          #收到ping,返回pong
                  ping << :pong
                end
              end
          ping
          =Actor.spawn do
                
          #ping一下,將ping作為消息傳遞
                pong << Actor.current
                Actor.receive do 
          |msg|
                  
          #接收到pong
                  puts "ping #{msg}"
                end
              end
          #resume調度器
          Actor.scheduler.reschedule

               輸出:
          ping pong
              
               都沒有問題,這個超級簡單actor基本完成了。可以看到,利用coroutine來實現actor是完全可行的,事實上我這里描述的實現基本上是revactor這個庫的實現原理。revactor是一個ruby的actor庫,它的實現就是基于Fiber,并且支持消息的模式匹配和thread之間的actor調度,有興趣地可以去玩下。更進一步,其實采用輕量級協程來模擬actor風格早就不是新鮮主意,比如在cn-erlounge的第四次會議上就有兩個topic是關于這個,一個是51.com利用基于ucontext的實現的類erlang進程模型,一個是許世偉的CERL。可以想見,他們的基本原理跟本文所描述不會有太大差別,那么面對的問題也是一樣。

               采用coroutine實現actor的主要缺點如下:
          1、因為是非搶占式,這就要求actor不能有阻塞操作,任何阻塞操作都需要異步化。IO可以使用異步IO,沒有os原生支持的就需要利用線程池,基本上是一個重復造輪子的過程。
          2、異常的隔離,某個actor的異常不能影響到調度器的運轉,簡單的try...catch是不夠的。
          3、多核的利用,調度器只能跑在一個線程上,無法充分利用多核優勢。
          4、效率因素,在actor數量劇增的情況下,簡單的FIFO的調度策略效率是個瓶頸,盡管coroutine的切換已經非常高效。

              當然,上面提到的這些問題并非無法解決,例如可以使用多線程多個調度器,類似erlang smp那樣來解決單個調度器的問題。但是如調度效率這樣的問題是很難解決的。相反,erlang的actor實現就不是通過coroutine,而是自己實現一套類似os的調度程序。
              首先明確一點,Erlang的process的調度是搶占式的,而非couroutine的協作式的。其次,Erlang早期版本是只有一個調度器,運行在一個線程上,隨著erts的發展,現在erlang的調度器已經支持smp,每個cpu關聯一個調度器,并且可以明確指定哪個調度器綁定到哪個cpu上。第三,Erlang的調度也是采用優先隊列+時間片輪詢的方式,每個調度器關聯一個ErtsRunQueueErtsRunQueue內部又分為三個ErtsRunPrioQueue隊列,分別對應high,max和normal,low的優先級,其中normal和low共用一個隊列;在Erlang中時間片是以reduction為單位,你可以將reduction理解成一次函數調用,每個被調度的process能執行的reduction次數是有限的。調度器每次都是從max隊列開始尋找等待調度的process并執行,當前調度的隊列如果為空或者執行的reductions超過限制,那么就降低優先級,調度下一個隊列。

             從上面的描述可以看出,Erlang優秀的地方不僅在于actor風格的輕量級process,另一個強悍的地方就是它的類os的調度器,再加上OTP庫的完美支持,這不是一般方案能山寨的。
              
              
            

          評論

          # re: Ruby Fiber指南(五): 實現Actor,兼談Erlang的process調度  回復  更多評論   

          2013-11-04 02:39 by 我傻逼我自豪
          協程=>流程控制
          actor=>對象抽象
          主站蜘蛛池模板: 广元市| 九龙县| 恭城| 龙游县| 淮南市| 南开区| 珠海市| 麻阳| 莱芜市| 金阳县| 云南省| 千阳县| 南木林县| 永登县| 渝北区| 德化县| 光山县| 龙州县| 斗六市| 周至县| 博兴县| 开平市| 天全县| 新泰市| 邵阳市| 泰兴市| 称多县| 西平县| 湾仔区| 长葛市| 泰来县| 安仁县| 大化| 锡林郭勒盟| 桐城市| 名山县| 平塘县| 房产| 德惠市| 九台市| 绥化市|