莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理
             
              Ruby Fiber指南(一)基礎(chǔ)
              Ruby Fiber指南(二)參數(shù)傳遞
              Ruby Fiber指南(三)過濾器
              Ruby Fiber指南(四)迭代器
              Ruby Actor指南(五)實現(xiàn)Actor

              寫這個指南的時候,計劃是第五章寫一個Fiber的應(yīng)用例子,但是一時沒有想到比較好的例子,模仿《Programming in Lua》中的多任務(wù)下載的例子也不合適,因為Ruby中的異步HttpClient跟lua還是很不一樣的,體現(xiàn)不了Fiber的優(yōu)點。因此,這第五節(jié)一直拖著沒寫。
              恰巧最近在小組中做了一次Erlang的分享,有人問到Erlang調(diào)度器的實現(xiàn)問題,這塊我沒注意過,那時候就根據(jù)我對coroutine實現(xiàn)actor的想法做了下解釋,后來思考了下那個解釋是錯誤的,Erlang的調(diào)度器是搶占式的,而通過coroutine實現(xiàn)的actor調(diào)度卻是非搶占的,兩者還是截然不同的。我在《Actor、Coroutine和Continuation的概念澄清》中提到coroutine可以實現(xiàn)actor風(fēng)格,actor跟coroutine并沒有必然的聯(lián)系,這篇文章的目的就在于證明這一點,使用Ruby Fiber實現(xiàn)一個簡單的actor風(fēng)格的庫,整個代碼不到100行。后面還會談到這個實現(xiàn)的缺點,以及我對Erlang調(diào)度器實現(xiàn)的理解。

              首先是monkey patch,給Thread和Fiber類加上兩個方法,分別用于獲取當(dāng)前線程的調(diào)度器和Fiber對應(yīng)的actor:
          class Thread
            
          #得到當(dāng)前線程的調(diào)度器
            def __scheduler__
              @internal_scheduler
          ||=FiberActor::Scheduler.new
            end
          end

          class Fiber
            
          #得到當(dāng)前Fiber的actor
            def __actor__
              @internal_actor
            end
          end

               這里實現(xiàn)的actor仍然是Thread內(nèi)的,一個Thread只跑一個調(diào)度器,每個actor關(guān)聯(lián)一個Fiber。
               讓我們來想想調(diào)度器該怎么實現(xiàn),調(diào)度器顧名思義就是協(xié)調(diào)actor的運行,每次挑選適當(dāng)?shù)腶ctor并執(zhí)行,可以想象調(diào)度器內(nèi)部應(yīng)該維護一個等待調(diào)度的actor隊列,Scheduler每次從隊列里取出一個actor并執(zhí)行,執(zhí)行完之后取下一個actor執(zhí)行,不斷循環(huán)持續(xù)這個過程;在沒有actor可以調(diào)度的時候,調(diào)度器應(yīng)該讓出執(zhí)行權(quán)。因此調(diào)度器本身也是一個Fiber,它內(nèi)部有個queue,用于維護等待調(diào)度的actor:
          module FiberActor
            
          class Scheduler
              
          def initialize
                @queue
          =[]
                @running
          =false
              end

              
          def run
                
          return if @running
                @running
          =true
                
          while true
                  
          #取出隊列中的actor并執(zhí)行
                  while actor=@queue.shift
                    begin
                      actor.fiber.resume
                    rescue 
          => ex
                      puts 
          "actor resume error,#{ex}"
                    end
                  end
                  
          #沒有任務(wù),讓出執(zhí)行權(quán)
                  Fiber.yield
                end
              end

              
          def reschedule
                
          if @running
                  
          #已經(jīng)啟動,只是被掛起,那么再次執(zhí)行
                  @fiber.resume
                
          else
                  
          #將當(dāng)前actor加入隊列
                  self << Actor.current
                end
              end

              
          def running?
                @running
              end

              
          def <<(actor)
                
          #將actor加入等待隊列
                @queue << actor unless @queue.last == actor
                
          #啟動調(diào)度器
                unless @running
                   @queue 
          << Actor.current
                   @fiber
          =Fiber.new { run }
                   @fiber.resume
                end
              end
            end
          end

              run方法是核心的調(diào)度方法,注釋說明了主要的工作流程。因為調(diào)度器可能讓出執(zhí)行權(quán),因此提供了reschedule方法重新resume啟動調(diào)度器。<<方法用于將等待被調(diào)度的actor加入等待隊列,如果調(diào)度器沒有啟動,那么就啟動調(diào)度Fiber。

              有了調(diào)度器,Actor的實現(xiàn)也很簡單,Actor跟Fiber是一對一的關(guān)系,Actor內(nèi)部維護一個mailbox,用來存儲接收到的消息。最重要的是receive原語的實現(xiàn),我們這里很簡單,不搞模式匹配,只是接收消息。receive的工作流程大概是這樣,判斷mailbox中有沒有消息,有消息的話,取出消息并調(diào)用block處理,沒有消息的話就yield讓出執(zhí)行權(quán)。

          module FiberActor  
            
          class Actor
              attr_accessor :fiber
              
          #定義類方法
              class << self
                
          def scheduler
                  Thread.current.
          __scheduler__
                end

                
          def current
                  Fiber.current.
          __actor__
                end

                
          #啟動一個actor
                def spawn(*args,&block)
                  fiber
          =Fiber.new do
                     block.call(args)
                  end
                  actor
          =new(fiber)
                  fiber.instance_variable_set :@internal_actor,actor
                  scheduler 
          << actor
                  actor
                end

                
          def receive(&block)
                  current.receive(
          &block)
                end
              end

              
          def initialize(fiber)
                 @mailbox
          =[]
                 @fiber
          =fiber
              end

              
          #給actor發(fā)送消息
              def << (msg)
                @mailbox 
          << msg
                
          #加入調(diào)度隊列
                Actor.scheduler << self
              end

              
          def receive(&block)
                
          #沒有消息的時候,讓出執(zhí)行權(quán)
                Fiber.yield while @mailbox.empty?
                msg
          =@mailbox.shift
                block.call(msg)
              end

              
          def alive?
                @fiber.alive?
              end
            end

          end

              Actor.spawn用于啟動一個actor,內(nèi)部其實是創(chuàng)建了一個fiber并包裝成actor給用戶,每個actor一被創(chuàng)建就加入調(diào)度器的等待隊列。<<方法用于向actor傳遞消息,傳遞消息后,該actor也將加入等待隊列,等待被調(diào)度。

              我們的簡化版actor庫已經(jīng)寫完了,可以嘗試寫幾個例子,最簡單的hello world:
          include FiberActor

          Actor.spawn { puts 
          "hello world!"}
               輸出:
          hello world!

              沒有問題,那么試試傳遞消息:
          actor=Actor.spawn{
             Actor.receive{ 
          |msg|  puts "receive #{msg}"}
          }
          actor 
          << :test_message
              輸出:
          receive test_message
              
              也成了,那么試試兩個actor互相傳遞消息,乒乓一下下:
          pong=Actor.spawn do
                Actor.receive do 
          |ping|
                  
          #收到ping,返回pong
                  ping << :pong
                end
              end
          ping
          =Actor.spawn do
                
          #ping一下,將ping作為消息傳遞
                pong << Actor.current
                Actor.receive do 
          |msg|
                  
          #接收到pong
                  puts "ping #{msg}"
                end
              end
          #resume調(diào)度器
          Actor.scheduler.reschedule

               輸出:
          ping pong
              
               都沒有問題,這個超級簡單actor基本完成了。可以看到,利用coroutine來實現(xiàn)actor是完全可行的,事實上我這里描述的實現(xiàn)基本上是revactor這個庫的實現(xiàn)原理。revactor是一個ruby的actor庫,它的實現(xiàn)就是基于Fiber,并且支持消息的模式匹配和thread之間的actor調(diào)度,有興趣地可以去玩下。更進一步,其實采用輕量級協(xié)程來模擬actor風(fēng)格早就不是新鮮主意,比如在cn-erlounge的第四次會議上就有兩個topic是關(guān)于這個,一個是51.com利用基于ucontext的實現(xiàn)的類erlang進程模型,一個是許世偉的CERL。可以想見,他們的基本原理跟本文所描述不會有太大差別,那么面對的問題也是一樣。

               采用coroutine實現(xiàn)actor的主要缺點如下:
          1、因為是非搶占式,這就要求actor不能有阻塞操作,任何阻塞操作都需要異步化。IO可以使用異步IO,沒有os原生支持的就需要利用線程池,基本上是一個重復(fù)造輪子的過程。
          2、異常的隔離,某個actor的異常不能影響到調(diào)度器的運轉(zhuǎn),簡單的try...catch是不夠的。
          3、多核的利用,調(diào)度器只能跑在一個線程上,無法充分利用多核優(yōu)勢。
          4、效率因素,在actor數(shù)量劇增的情況下,簡單的FIFO的調(diào)度策略效率是個瓶頸,盡管coroutine的切換已經(jīng)非常高效。

              當(dāng)然,上面提到的這些問題并非無法解決,例如可以使用多線程多個調(diào)度器,類似erlang smp那樣來解決單個調(diào)度器的問題。但是如調(diào)度效率這樣的問題是很難解決的。相反,erlang的actor實現(xiàn)就不是通過coroutine,而是自己實現(xiàn)一套類似os的調(diào)度程序。
              首先明確一點,Erlang的process的調(diào)度是搶占式的,而非couroutine的協(xié)作式的。其次,Erlang早期版本是只有一個調(diào)度器,運行在一個線程上,隨著erts的發(fā)展,現(xiàn)在erlang的調(diào)度器已經(jīng)支持smp,每個cpu關(guān)聯(lián)一個調(diào)度器,并且可以明確指定哪個調(diào)度器綁定到哪個cpu上。第三,Erlang的調(diào)度也是采用優(yōu)先隊列+時間片輪詢的方式,每個調(diào)度器關(guān)聯(lián)一個ErtsRunQueueErtsRunQueue內(nèi)部又分為三個ErtsRunPrioQueue隊列,分別對應(yīng)high,max和normal,low的優(yōu)先級,其中normal和low共用一個隊列;在Erlang中時間片是以reduction為單位,你可以將reduction理解成一次函數(shù)調(diào)用,每個被調(diào)度的process能執(zhí)行的reduction次數(shù)是有限的。調(diào)度器每次都是從max隊列開始尋找等待調(diào)度的process并執(zhí)行,當(dāng)前調(diào)度的隊列如果為空或者執(zhí)行的reductions超過限制,那么就降低優(yōu)先級,調(diào)度下一個隊列。

             從上面的描述可以看出,Erlang優(yōu)秀的地方不僅在于actor風(fēng)格的輕量級process,另一個強悍的地方就是它的類os的調(diào)度器,再加上OTP庫的完美支持,這不是一般方案能山寨的。
              
              
            

          評論

          # re: Ruby Fiber指南(五): 實現(xiàn)Actor,兼談Erlang的process調(diào)度  回復(fù)  更多評論   

          2013-11-04 02:39 by 我傻逼我自豪
          協(xié)程=>流程控制
          actor=>對象抽象
          主站蜘蛛池模板: 东乡县| 西宁市| 军事| 峨眉山市| 富民县| 滨海县| 钦州市| 梅河口市| 兴义市| 志丹县| 福清市| 林州市| 仪陇县| 阿坝县| 灵川县| 横峰县| 昌图县| 浑源县| 蓝田县| 府谷县| 兴山县| 邵东县| 昌图县| 谢通门县| 马鞍山市| 托克逊县| 泸定县| 丹东市| 简阳市| 五常市| 谷城县| 贵州省| 敖汉旗| 乐平市| 枝江市| 临武县| 兴海县| 沙湾县| 集贤县| 江阴市| 确山县|