莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          Clojure的并發(四)Agent深入分析和Actor

          Posted on 2010-07-19 18:48 dennis 閱讀(4143) 評論(0)  編輯  收藏 所屬分類: javaClojure
          Clojure 的并發(一) Ref和STM
          Clojure 的并發(二)Write Skew分析
          Clojure 的并發(三)Atom、緩存和性能
          Clojure 的并發(四)Agent深入分析和Actor
          Clojure 的并發(五)binding和let
          Clojure的并發(六)Agent可以改進的地方
          Clojure的并發(七)pmap、pvalues和pcalls
          Clojure的并發(八)future、promise和線程

          四、 Agent和Actor

             除了用于協調同步的Ref,獨立同步的Ref,還有一類非常常見的需求:你可能希望狀態的更新是異步,你通常不關心更新的結果,這時候你可以考慮下使用Agent。

          1、創建agent:

          user=> (def counter (agent 0))
          #'user/counter

          user
          => counter
          #<Agent@9444d1: 0>


          通過agent函數你就可以創建一個agent,指向一個不可變的初始狀態。

          2、取agent的值,這跟Ref和Atom沒啥兩樣,都是通過deref或者@宏:
          user=> @counter
          0
          user
          => (deref counter)
          0

          3、更新agent,通過send或者send-off函數給agent發送任務去更新agent:
          user=> (send counter inc)
          #<Agent@9444d1: 0>

            send返回agent對象,內部的值仍然是0,而非inc遞增之后的1,這是因為send是異步發送,更新是在另一個線程執行,兩個線程(REPL主線程和更新任務的線程)的執行順序沒有同步,顯示什么取決于兩者誰更快。更新肯定是發生了,查看counter的值:
          user=> @counter
          1

             果然更新到了1了。send的方法簽名:
          (send a f & args)

             其中f是更新的函數,它的定義如下:
          (f state-of-agent & args)
             也就是它會在第一個參數接收當前agent的狀態,而args是send附帶的參數。

             還有個方法,send-off,它的作用于send類似:
          user=> (send-off counter inc)
          #<Agent@9444d1: 1>
          user=> @counter
          2

             send和send-off的區別在于,send是將任務交給一個固定大小的線程池執行
          final public static ExecutorService pooledExecutor =
                  Executors
          .newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
             默認線程池大小是CPU核數加上2。因此send執行的任務最好不要有阻塞的操作。而send-off則使用沒有大小限制(取決于內存)的線程池:

          final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();
            
             因此,send-off比較適合任務有阻塞的操作,如IO讀寫之類。請注意,所有的agent是共用這些線程池,這從這些線程池的定義看出來,都是靜態變量。

          4、異步轉同步
          ,剛才提到send和send-off都是異步將任務提交給線程池去處理,如果你希望同步等待結果返回,那么可以使用await函數:
           (do (send counter inc) (await counter) (println @counter))

          send一個任務之后,調用await等待agent所有派發的更新任務結束,然后打印agent的值。await是阻塞當前線程,直到至今為止所有任務派發執行完畢才返回。await沒有超時,會一直等待直到條件滿足,await-for則可以接受等待的超時時間,如果超過指定時間沒有返回,則返回nil,否則返回結果。
           (do (send counter inc) (await-for 100 counter) (println @counter))

          await-for接受的單位是毫秒。

          5、錯誤處理


             agent也可以跟Ref和Atom一樣設置validator,用于約束驗證。由于agent的更新是異步的,你不知道更新的時候agent是否發生異常,只有等到你去取值或者更新的時候才能發現:
          user=> (def counter (agent 0 :validator number?))
          #
          'user/counter

          user
          => (send counter (fn[_] "foo"))
          #
          <clojure.lang.Agent@4de8ce62: 0>

             強制要求counter的值是數值類型,第二個表達式我們給counter發送了一個更新任務,想將狀態更新為字符串"foo",由于是異步更新,返回的結果可能沒有顯示異常,當你取值的時候,問題出現了:
          user=> @counter
          java.lang.Exception: Agent has errors (NO_SOURCE_FILE:
          0)

            告訴你agent處于不正常的狀態,如果你想獲取詳細信息,可以通過agent-errors函數:
          user=> (.printStackTrace (agent-errors counter))
          java.lang.IllegalArgumentException: No matching field found: printStackTrace 
          for class clojure.lang.PersistentList (NO_SOURCE_FILE:0)

             你可以恢復agent到前一個正常的狀態,通過clear-agent-errors函數:
           
          user=> (clear-agent-errors counter)
          nil
          user
          => @counter
          0

          6、加入事務

          agent跟atom不一樣,agent可以加入事務,在事務里調用send發送一個任務,當事務成功的時候該任務將只會被發送一次,最多最少都一次。利用這個特性,我們可以實現在事務操作的時候寫文件,達到ACID中的D——持久性的目的:
          (def backup-agent (agent "output/messages-backup.clj" ))
          (def messages (ref []))
          (use 
          '[clojure.contrib.duck-streams :only (spit)])
          (defn add-message-with-backup [msg]
                 (dosync
                     (let [snapshot (commute messages conj msg)]
                          (send
          -off backup-agent (fn [filename]
                                                  (spit filename snapshot)
                                                  filename))
                     snapshot)))

          定義了一個backup-agent用于保存消息,add-message-with-backup函數首先將狀態保存到messages,這是個普通的Ref,然后調用send-off給backup-agent一個任務:
           (fn [filename]
                    (spit filename snapshot)
                   filename)
          這個任務是一個匿名函數,它利用spit打開文件,寫入當前的快照,并且關閉文件,文件名來自backup-agent的狀態值。注意到,我們是用send-off,send-off利用cache線程池,哪怕阻塞也沒關系。

          利用事務加上一個backup-agent可以實現類似數據庫的ACID,但是還是不同的,主要區別在于backup-agent的更新是異步,并不保證一定寫入文件,因此持久性也沒辦法得到保證。

          7、關閉線程池:


          前面提到agent的更新都是交給線程池去處理,在系統關閉的時候你需要關閉這兩個線程吃,通過shutdown-agents方法,你再添加任務將被拒絕:
          user=> (shutdown-agents)
          nil
          user
          => (send counter inc)
          java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
          0)
          user
          => (def counter (agent 0))
          #
          'user/counter
          user=> (send counter inc)    
          java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
          0)

          哪怕我重新創建了counter,提交任務仍然被拒絕,進一步證明這些線程池是全局共享的。

          8、原理淺析

          前文其實已經將agent的實現原理大體都說了,agent本身只是個普通的java對象,它的內部維持一個狀態和一個隊列:
              volatile Object state;
              AtomicReference
          <IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);


          任務提交的時候,是封裝成Action對象,添加到此隊列

              
          public Object dispatch(IFn fn, ISeq args, boolean solo) {
                  
          if (errors != null) {
                      
          throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
                  }
                  
          //封裝成action對象
                  Action action = new Action(this, fn, args, solo);
                  dispatchAction(action);

                  
          return this;
              }


              
          static void dispatchAction(Action action) {
                  LockingTransaction trans 
          = LockingTransaction.getRunning();
                  
          // 有事務,加入事務
                  if (trans != null)
                      trans.enqueue(action);
                  
          else if (nested.get() != null) {
                      nested.set(nested.get().cons(action));
                  }
                  
          else {
                      
          // 入隊
                      action.agent.enqueue(action);
                  }
              }

          send和send-off都是調用Agent的dispatch方法,只是兩者的參數不一樣,dispatch的第二個參數 solo決定了是使用哪個線程池處理action:
          (defn send
            [#
          ^clojure.lang.Agent a f & args]
              (. a (dispatch f args 
          false)))

          (defn send
          -off
            [#
          ^clojure.lang.Agent a f & args]
              (. a (dispatch f args 
          true)))

          send-off將solo設置為true,當為true的時候使用cache線程池:

             
          final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();

              
          final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();

                  
          void execute() {
                      
          if (solo)
                          soloExecutor.execute(
          this);
                      
          else
                          pooledExecutor.execute(
          this);
                  }

          執行的時候調用更新函數并設置新的狀態:

          try {
                              Object oldval 
          = action.agent.state;
                              Object newval 
          = action.fn.applyTo(RT.cons(action.agent.state, action.args));
                              action.agent.setState(newval);
                              action.agent.notifyWatches(oldval, newval);
                          }
                          
          catch (Throwable e) {
                              
          // todo report/callback
                              action.agent.errors = RT.cons(e, action.agent.errors);
                              hadError 
          = true;
                          }

          9、跟actor的比較:

          Agent跟Actor有一個顯著的不同,agent的action來自于別人發送的任務附帶的更新函數,而actor的action則是自身邏輯的一部分。因此,如果想用agent實現actor模型還是相當困難的,下面是我的一個嘗試:

          (ns actor)

          (defn receive [
          & args]
             (apply hash
          -map args))
          (defn self [] 
          *agent*)

          (defn spawn [recv
          -map]
              (agent recv
          -map))

          (defn 
          ! [actor msg]
              (send actor #(apply (get 
          %1 %2)  (vector %2)) msg))
          ;;啟動一個actor
          (def actor (spawn 
                       (receive :hello #(println 
          "receive "%))))
          ;;發送消息 hello
          (
          ! actor :hello)

             利用spawn啟動一個actor,其實本質上是一個agent,而發送通過感嘆號!,給agent發送一個更新任務,它從recv-map中查找消息對應的處理函數并將消息作為參數來執行。難點在于消息匹配,匹配這種簡單類型的消息沒有問題,但是如果匹配用到變量,暫時沒有想到好的思路實現,例如實現兩個actor的ping/pong。

           
          主站蜘蛛池模板: 荥阳市| 南和县| 宣城市| 苗栗县| 兴国县| 罗源县| 苏尼特右旗| 偏关县| 铜鼓县| 定州市| 阳信县| 清远市| 中阳县| 温宿县| 阜阳市| 密云县| 出国| 敦煌市| 酒泉市| 阳朔县| 淮滨县| 谢通门县| 万山特区| 陇川县| 岳池县| 穆棱市| 西和县| 罗甸县| 军事| 大关县| 无锡市| 平塘县| 雷州市| 佛冈县| 行唐县| 襄汾县| 泗阳县| 增城市| 宝鸡市| 隆尧县| 北宁市|