莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          Clojure 的并發(一) Ref和STM
          Clojure 的并發(二)Write Skew分析
          Clojure 的并發(三)Atom、緩存和性能
          Clojure 的并發(四)Agent深入分析和Actor
          Clojure 的并發(五)binding和let
          Clojure的并發(六)Agent可以改進的地方
          Clojure的并發(七)pmap、pvalues和pcalls
          Clojure的并發(八)future、promise和線程

          四、 Agent和Actor

             除了用于協調同步的Ref,獨立同步的Ref,還有一類非常常見的需求:你可能希望狀態的更新是異步,你通常不關心更新的結果,這時候你可以考慮下使用Agent。

          1、創建agent:

          user=> (def counter (agent 0))
          #'user/counter

          user
          => counter
          #<Agent@9444d1: 0>


          通過agent函數你就可以創建一個agent,指向一個不可變的初始狀態。

          2、取agent的值,這跟Ref和Atom沒啥兩樣,都是通過deref或者@宏:
          user=> @counter
          0
          user
          => (deref counter)
          0

          3、更新agent,通過send或者send-off函數給agent發送任務去更新agent:
          user=> (send counter inc)
          #<Agent@9444d1: 0>

            send返回agent對象,內部的值仍然是0,而非inc遞增之后的1,這是因為send是異步發送,更新是在另一個線程執行,兩個線程(REPL主線程和更新任務的線程)的執行順序沒有同步,顯示什么取決于兩者誰更快。更新肯定是發生了,查看counter的值:
          user=> @counter
          1

             果然更新到了1了。send的方法簽名:
          (send a f & args)

             其中f是更新的函數,它的定義如下:
          (f state-of-agent & args)
             也就是它會在第一個參數接收當前agent的狀態,而args是send附帶的參數。

             還有個方法,send-off,它的作用于send類似:
          user=> (send-off counter inc)
          #<Agent@9444d1: 1>
          user=> @counter
          2

             send和send-off的區別在于,send是將任務交給一個固定大小的線程池執行
          final public static ExecutorService pooledExecutor =
                  Executors
          .newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
             默認線程池大小是CPU核數加上2。因此send執行的任務最好不要有阻塞的操作。而send-off則使用沒有大小限制(取決于內存)的線程池:

          final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();
            
             因此,send-off比較適合任務有阻塞的操作,如IO讀寫之類。請注意,所有的agent是共用這些線程池,這從這些線程池的定義看出來,都是靜態變量。

          4、異步轉同步
          ,剛才提到send和send-off都是異步將任務提交給線程池去處理,如果你希望同步等待結果返回,那么可以使用await函數:
           (do (send counter inc) (await counter) (println @counter))

          send一個任務之后,調用await等待agent所有派發的更新任務結束,然后打印agent的值。await是阻塞當前線程,直到至今為止所有任務派發執行完畢才返回。await沒有超時,會一直等待直到條件滿足,await-for則可以接受等待的超時時間,如果超過指定時間沒有返回,則返回nil,否則返回結果。
           (do (send counter inc) (await-for 100 counter) (println @counter))

          await-for接受的單位是毫秒。

          5、錯誤處理


             agent也可以跟Ref和Atom一樣設置validator,用于約束驗證。由于agent的更新是異步的,你不知道更新的時候agent是否發生異常,只有等到你去取值或者更新的時候才能發現:
          user=> (def counter (agent 0 :validator number?))
          #
          'user/counter

          user
          => (send counter (fn[_] "foo"))
          #
          <clojure.lang.Agent@4de8ce62: 0>

             強制要求counter的值是數值類型,第二個表達式我們給counter發送了一個更新任務,想將狀態更新為字符串"foo",由于是異步更新,返回的結果可能沒有顯示異常,當你取值的時候,問題出現了:
          user=> @counter
          java.lang.Exception: Agent has errors (NO_SOURCE_FILE:
          0)

            告訴你agent處于不正常的狀態,如果你想獲取詳細信息,可以通過agent-errors函數:
          user=> (.printStackTrace (agent-errors counter))
          java.lang.IllegalArgumentException: No matching field found: printStackTrace 
          for class clojure.lang.PersistentList (NO_SOURCE_FILE:0)

             你可以恢復agent到前一個正常的狀態,通過clear-agent-errors函數:
           
          user=> (clear-agent-errors counter)
          nil
          user
          => @counter
          0

          6、加入事務

          agent跟atom不一樣,agent可以加入事務,在事務里調用send發送一個任務,當事務成功的時候該任務將只會被發送一次,最多最少都一次。利用這個特性,我們可以實現在事務操作的時候寫文件,達到ACID中的D——持久性的目的:
          (def backup-agent (agent "output/messages-backup.clj" ))
          (def messages (ref []))
          (use 
          '[clojure.contrib.duck-streams :only (spit)])
          (defn add-message-with-backup [msg]
                 (dosync
                     (let [snapshot (commute messages conj msg)]
                          (send
          -off backup-agent (fn [filename]
                                                  (spit filename snapshot)
                                                  filename))
                     snapshot)))

          定義了一個backup-agent用于保存消息,add-message-with-backup函數首先將狀態保存到messages,這是個普通的Ref,然后調用send-off給backup-agent一個任務:
           (fn [filename]
                    (spit filename snapshot)
                   filename)
          這個任務是一個匿名函數,它利用spit打開文件,寫入當前的快照,并且關閉文件,文件名來自backup-agent的狀態值。注意到,我們是用send-off,send-off利用cache線程池,哪怕阻塞也沒關系。

          利用事務加上一個backup-agent可以實現類似數據庫的ACID,但是還是不同的,主要區別在于backup-agent的更新是異步,并不保證一定寫入文件,因此持久性也沒辦法得到保證。

          7、關閉線程池:


          前面提到agent的更新都是交給線程池去處理,在系統關閉的時候你需要關閉這兩個線程吃,通過shutdown-agents方法,你再添加任務將被拒絕:
          user=> (shutdown-agents)
          nil
          user
          => (send counter inc)
          java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
          0)
          user
          => (def counter (agent 0))
          #
          'user/counter
          user=> (send counter inc)    
          java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:
          0)

          哪怕我重新創建了counter,提交任務仍然被拒絕,進一步證明這些線程池是全局共享的。

          8、原理淺析

          前文其實已經將agent的實現原理大體都說了,agent本身只是個普通的java對象,它的內部維持一個狀態和一個隊列:
              volatile Object state;
              AtomicReference
          <IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);


          任務提交的時候,是封裝成Action對象,添加到此隊列

              
          public Object dispatch(IFn fn, ISeq args, boolean solo) {
                  
          if (errors != null) {
                      
          throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
                  }
                  
          //封裝成action對象
                  Action action = new Action(this, fn, args, solo);
                  dispatchAction(action);

                  
          return this;
              }


              
          static void dispatchAction(Action action) {
                  LockingTransaction trans 
          = LockingTransaction.getRunning();
                  
          // 有事務,加入事務
                  if (trans != null)
                      trans.enqueue(action);
                  
          else if (nested.get() != null) {
                      nested.set(nested.get().cons(action));
                  }
                  
          else {
                      
          // 入隊
                      action.agent.enqueue(action);
                  }
              }

          send和send-off都是調用Agent的dispatch方法,只是兩者的參數不一樣,dispatch的第二個參數 solo決定了是使用哪個線程池處理action:
          (defn send
            [#
          ^clojure.lang.Agent a f & args]
              (. a (dispatch f args 
          false)))

          (defn send
          -off
            [#
          ^clojure.lang.Agent a f & args]
              (. a (dispatch f args 
          true)))

          send-off將solo設置為true,當為true的時候使用cache線程池:

             
          final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();

              
          final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();

                  
          void execute() {
                      
          if (solo)
                          soloExecutor.execute(
          this);
                      
          else
                          pooledExecutor.execute(
          this);
                  }

          執行的時候調用更新函數并設置新的狀態:

          try {
                              Object oldval 
          = action.agent.state;
                              Object newval 
          = action.fn.applyTo(RT.cons(action.agent.state, action.args));
                              action.agent.setState(newval);
                              action.agent.notifyWatches(oldval, newval);
                          }
                          
          catch (Throwable e) {
                              
          // todo report/callback
                              action.agent.errors = RT.cons(e, action.agent.errors);
                              hadError 
          = true;
                          }

          9、跟actor的比較:

          Agent跟Actor有一個顯著的不同,agent的action來自于別人發送的任務附帶的更新函數,而actor的action則是自身邏輯的一部分。因此,如果想用agent實現actor模型還是相當困難的,下面是我的一個嘗試:

          (ns actor)

          (defn receive [
          & args]
             (apply hash
          -map args))
          (defn self [] 
          *agent*)

          (defn spawn [recv
          -map]
              (agent recv
          -map))

          (defn 
          ! [actor msg]
              (send actor #(apply (get 
          %1 %2)  (vector %2)) msg))
          ;;啟動一個actor
          (def actor (spawn 
                       (receive :hello #(println 
          "receive "%))))
          ;;發送消息 hello
          (
          ! actor :hello)

             利用spawn啟動一個actor,其實本質上是一個agent,而發送通過感嘆號!,給agent發送一個更新任務,它從recv-map中查找消息對應的處理函數并將消息作為參數來執行。難點在于消息匹配,匹配這種簡單類型的消息沒有問題,但是如果匹配用到變量,暫時沒有想到好的思路實現,例如實現兩個actor的ping/pong。

           

          posted @ 2010-07-19 18:48 dennis 閱讀(4143) | 評論 (0)編輯 收藏

          Clojure 的并發(一) Ref和STM
          Clojure 的并發(二)Write Skew分析
          Clojure 的并發(三)Atom、緩存和性能
          Clojure 的并發(四)Agent深入分析和Actor
          Clojure 的并發(五)binding和let
          Clojure的并發(六)Agent可以改進的地方
          Clojure的并發(七)pmap、pvalues和pcalls
          Clojure的并發(八)future、promise和線程

          三、Atom和緩存

              Ref適用的場景是系統中存在多個相互關聯的狀態,他們需要一起更新,因此需要通過dosync做事務包裝。但是如果你有一個狀態變量,不需要跟其他狀態變量協作,這時候應該使用Atom了。可以將一個Atom和一個Ref一起在一個事務里更新嗎?這沒辦法做到,如果你需要相互協作,你只能使用Ref。Atom適用的場景是狀態是獨立,沒有依賴,它避免了與其他Ref交互的開銷,因此性能會更好,特別是對于讀來說。

           1、定義Atom,采用atom函數,賦予一個初始狀態:
          (def mem (atom {}))

          這里將mem的初始狀態定義為一個map。

          2、deref和@:可以用deref函數,也可以簡單地用宏@,這跟Ref一樣,取atom的值:
          @mem         => {}
          (deref mem)  => {}

          3、reset!:重新設置atom的值,不關心當前值是什么:
           (reset! mem {:1})

          查看mem:
          user=> @mem
          {
          :1}
          已經更新到新的map了。

          4、swap!:如果你的更新需要依賴當前的狀態值,或者只想更新狀態的某個部分,那么就需要使用swap!(類似alter):
          (swap! an-atom f & args)

          swap! 將函數f作用于當前狀態值和額外的參數args之上,形成新的狀態值,例如我們給mem加上一個keyword:
          user=> (swap! mem assoc :2)
          {
          :2, :1}

          看到,:b 2被加入了當前的map。

          5、compare and set:
          類似原子變量AtomicInteger之類,atom也可以做compare and set的操作:
          (compare-and-set! atom oldValue newValue)

          當且僅當atom的當前狀態值等于oldValue的時候,將狀態值更新為newValue,并返回一個布爾值表示成功或者失敗:
          user=> (def c (atom 1))
          #'user/c
          user=> (compare-and-set! c 2 3)
          false
          user
          => (compare-and-set! c 1 3)
          true
          user
          => @c
          3


          6、緩存和atom:
          (1)atom非常適合實現緩存,緩存通常不會跟其他系統狀態形成依賴,并且緩存對讀的速度要求更高。上面例子中用到的mem其實就是個簡單的緩存例子,我們來實現一個putm和getm函數:
          ;;創建緩存
          (defn make
          -cache [] (atom {}))

          ;;放入緩存
          (defn putm [cache key value] (swap
          ! cache assoc key value))

          ;;取出
          (defn getm [cache key] (key 
          @cache))


             這里key要求是keyword,keyword是類似:a這樣的字符序列,你熟悉ruby的話,可以暫時理解成symbol。使用這些API:
          user=> (def cache (make-cache))
          #'user/cache
          user=> (putm cache :1)
          {
          :1}
          user
          => (getm cache :a)
          1
          user
          => (putm cache :2)
          {
          :2, :1}
          user
          => (getm cache :b)
          2

          (2)memoize函數作用于函數f,產生一個新函數,新函數內部保存了一個緩存,緩存從參數到結果的映射。第一次調用的時候,發現緩存沒有,就會調用f去計算實際的結果,并放入內部的緩存;下次調用同樣的參數的時候,就直接從緩存中取,而不用再次調用f,從而達到提升計算效率的目的。
          memoize的實現就是基于atom,查看源碼:
          (defn memoize
            [f]
            (let [mem (atom {})]
              (fn [
          & args]
                (
          if-let [e (find @mem args)]
                  (val e)
                  (let [ret (apply f args)]
                    (swap
          ! mem assoc args ret)
                    ret)))))

          內部的緩存名為mem,memoize返回的是一個匿名函數,它接收原有的f函數的參數,if-let判斷綁定的變量e是否存在,變量e是通過find從緩存中查詢args得到的項,如果存在的話,調用val得到真正的結果并返回;如果不存在,那么使用apply函數將f作用于參數列表之上,計算出結果,并利用swap!將結果加入mem緩存,返回計算結果。

          7、性能測試
          使用atom實現一個計數器,和使用java.util.concurrent.AtomicInteger做計數器,做一個性能比較,各啟動100個線程,每個線程執行100萬次原子遞增,計算各自的耗時,測試程序如下,代碼有注釋,不再羅嗦:

          (ns atom-perf)
          (
          import 'java.util.concurrent.atomic.AtomicInteger)
          (import 'java.util.concurrent.CountDownLatch)

          (
          def a (AtomicInteger. 0))
          (
          def b (atom 0))

          ;;為了性能,給java加入type hint
          (defn java
          -inc [#^AtomicInteger counter] (.incrementAndGet counter))
          (defn countdown-latch [#^CountDownLatch latch] (.countDown latch))

          ;;單線程執行緩存次數
          (
          def max_count 1000000)
          ;;線程數 
          (
          def thread_count 100)

          (defn benchmark [fun]
            (let [ latch (CountDownLatch. thread_count)  ;;關卡鎖
                   start (System
          /currentTimeMillis) ]     ;;啟動時間
                 (dotimes [_ thread_count] (.start (Thread. 
          #(do (dotimes [_ max_count] (fun)) (countdown-latch latch))))) 
                 (.await latch)
                 (
          - (System/currentTimeMillis) start)))
                   

          (println 
          "atom:" (benchmark #(swap! b inc)))
          (println "AtomicInteger:" (benchmark #(java-inc a)))

          (println (.get a))
          (println @b)

              默認clojure調用java都是通過反射,加入type hint之后編譯的字節碼就跟java編譯器的一致,為了比較公平,定義了java-inc用于調用AtomicInteger.incrementAndGet方法,定義countdown-latch用于調用CountDownLatch.countDown方法,兩者都為參數添加了type hint。如果不采用type hint,AtomicInteger反射調用的效率是非常低的。

          測試下來,在我的ubuntu上,AtomicInteger還是占優,基本上比atom的實現快上一倍:

          atom: 9002
          AtomicInteger: 
          4185
          100000000
          100000000

          照我的理解,這是由于AtomicInteger調用的是native的方法,基于硬件原語做cas,而atom則是用戶空間內的clojure自己做的CAS,兩者的性能有差距不出意料之外。

          看了源碼,Atom是基于java.util.concurrent.atomic.AtomicReference實現的,調用的方法是
           public final boolean compareAndSet(V expect, V update) {
                  
          return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
              }

          而AtomicInteger調用的方法是:

              public final boolean compareAndSet(int expect, int update) {
              
          return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
              }

          兩者的效率差距有這么大嗎?暫時存疑。
             

          posted @ 2010-07-17 13:58 dennis 閱讀(6139) | 評論 (3)編輯 收藏

          Clojure 的并發(一) Ref和STM
          Clojure 的并發(二)Write Skew分析
          Clojure 的并發(三)Atom、緩存和性能
          Clojure 的并發(四)Agent深入分析和Actor
          Clojure 的并發(五)binding和let
          Clojure的并發(六)Agent可以改進的地方
          Clojure的并發(七)pmap、pvalues和pcalls
          Clojure的并發(八)future、promise和線程

               在介紹Ref的上一篇blog提到,基于snapshot做隔離的MVCC實現來說,有個現象,叫寫偏序——Write Skew。根本的原因是由于每個事務在更新過程中無法看到其他事務的更改的結果,導致各個事務提交之后的最終結果違反了一致性。為了理解這個現象,最好的辦法是在代碼中復現這個現象。考慮下列這個場景:
             屁民Peter有兩個賬戶account1和account2,簡稱為A1和A2,這兩個賬戶各有100塊錢,一個顯然的約束就是這兩個賬戶的余額之和必須大于或者等于零,銀行肯定不能讓你賺了去,你也怕成為下個許霆。現在,假設有兩個事務T1和T2,T1從A1提取200塊錢,T2則從A2提取200塊錢。如果這兩個事務按照先后順序進行,后面執行的事務判斷A1+A2-200>=0約束的時候發現失敗,那么就不會執行,保證了一致性和隔離性。但是基于多版本并發控制的Clojure,這兩個事務完全可能并發地執行,因為他們都是基于一個當前賬戶的快照做更新的, 并且在更新過程中無法看到對方的修改結果,T1執行的時候判斷A1+A2-200>=0約束成立,從A1扣除了200塊;同樣,T2查看當前快照也滿足約束A1+A2-200>=0,從A2扣除了200塊,問題來了,最終的結果是A1和A2都成-100塊了,身為屁民的你竟然從銀行多拿了200塊,你等著無期吧。

             現在,我們就來模擬這個現象,定義兩個賬戶:

          ;;兩個賬戶,約束是兩個賬戶的余額之和必須>=0
          (def account1 (
          ref 100))
          (def account2 (
          ref 100))

             定義一個取錢方法:
          ;;定義扣除函數
          (defn deduct [account n other]
                (dosync 
                    (
          if (>= (+ (- @account n) @other0)
                        (alter account 
          - n))))

             其中account是將要扣錢的帳號,other是peter的另一個帳號,在執行扣除前要滿足約束@account-n+@other>=0

             接下來就是搞測試了,各啟動N個線程嘗試從A1和A2扣錢,為了盡快模擬出問題,使得并發程度高一些,我們將線程設置大一些,并且使用java.util.concurrent.CyclicBarrier做關卡,測試代碼如下:

          ;;設定關卡
          (def barrier (java
          .util.concurrent.CyclicBarrier. 6001))
          ;;各啟動3000個線程嘗試去從賬戶1和賬戶2扣除200
          (dotimes [_ 
          3000] (.start (Thread. #(do (.await  barrier) (deduct account1 200 account2) (.await  barrier)))))
          (dotimes [_ 3000] (.start (Thread. #(do (.await  barrier) (deduct account2 200 account1) (.await  barrier)))))

          (
          .await barrier)

          (
          .await barrier)
          ;;打印最終結果
          (println 
          @account1)
          (println 
          @account2)

               線程里干了三件事情:首先調用barrier.await嘗試突破關卡,所有線程啟動后沖破關卡,進入扣錢環節deduct,最后再調用barrier.await用于等待所有線程結束。在所有線程結束后,打印當前賬戶的余額。

               這段代碼在我的機器上每執行10次左右都至少有一次打印:
          -100
          -100
             
              這表示A1和A2的賬戶都欠下了100塊錢,完全違反了約束條件,法庭的傳票在召喚peter。

              那么怎么防止write skew現象呢?如果我們能在事務過程中保護某些Ref不被其他事務修改,那么就可以保證當前的snapshot的一致性,最終保證結果的一致性。通過ensure函數即可保護Ref,稍微修改下deduct函數:
          (defn deduct [account n other]
                (dosync (ensure account) (ensure other)
                    (
          if (>= (+ (- @account n) @other0)
                        (alter account 
          - n))))

             在執行事務更新前,先通過ensure保護下account和other賬戶不被其他事務修改。你可以再多次運行看看,會不會再次打印非法結果。

             上篇blog最后也提到了一個士兵巡邏的例子來介紹write skew,我也寫了段代碼來模擬那個例子,有興趣可以跑跑,非法結果是三個軍營的士兵之和小于100(兩個軍營最后只剩下25個人)。

          ;1號軍營
          (def g1 (
          ref 45))
          ;2號軍營
          (def g2 (
          ref 45))
          ;3號軍營
          (def g3 (
          ref 45))
          ;從1號軍營抽調士兵
          (defn dispatch
          -patrol-g1 [n]
              (dosync 
                (
          if (> (+ (- @g1 n) @g2 @g3100)
                    (alter g1 
          - 20)
                  ))
                )
          ;從2號軍營抽調士兵
          (defn dispatch
          -patrol-g2 [n]
              (dosync 
                (
          if (> (+ @g1 (- @g2 n) @g3100)
                    (alter g2 
          - 20)
                  ))
                )
          ;;設定關卡
          (def barrier (java
          .util.concurrent.CyclicBarrier. 4001))
          ;;各啟動2000個線程嘗試去從1號和2號軍營抽調20個士兵
          (dotimes [_ 
          2000] (.start (Thread. #(do (.await  barrier) (dispatch-patrol-g1 20) (.await  barrier)))))
          (dotimes [_ 2000] (.start (Thread. #(do (.await  barrier) (dispatch-patrol-g2 20) (.await  barrier)))))
          ;(dotimes [_ 10] (.start (Thread. #(do (.await  barrier) (dispatch-patrol-g3 20) (.await  barrier)))))

          (
          .await barrier)

          (
          .await barrier)
          ;;打印最終結果
          (println 
          @g1)
          (println 
          @g2)
          (println 
          @g3)




          posted @ 2010-07-17 05:44 dennis 閱讀(4968) | 評論 (1)編輯 收藏

              我估計我不寫這樣的標題,吸引不了人氣。問題的起因是Javaeye的一個帖子《淘寶面試題:如何充分利用多核CPU,計算很大的 List中所有整數的和》,看見為了這么個問題寫長長的Java代碼,讓我十分蛋疼。為了表示蛋定,我想介紹下用Clojure解決這個問題的方法。

              題目很明確了,要你充分多核,這年頭不“多核”不好意思出去跟人打招呼,為了多核,你要將list拆分下,每個子list并發去計算和,然后綜合這些結果求出最終的和,您沒搞錯,這不是傳說中人見人愛的MapReduce嗎?你沒聽過?不好意思,你out了。

              咱們先別著急并發,先搞個不并發的版本試試看,第一步,切分list,實在不好意思,clojure有現成的clojure.core/partition函數:
          user=> (partition 3 3 [0'(1 2 3 4 5 6 7 8 9 10))
          ((1 2 3) (4 5 6) (7 8 9) (10 0))

             牛刀小試,將(1 2 3 4 5 6 7 8 9 10)切分成了3個子串,還有個可憐的犀利哥——10號同學沒辦法歸入任意一個組,只好讓他跟虛無的0為伴了。partition的第三個參數指定一個湊伙的集合,當無法完全切分的時候,拿這個集合里的元素湊合。但是我們不能隨便湊合啊,隨便湊合加起來結果就不對了,用0就沒問題了。

             切分完了,計算子集的和,那不要太簡單,該reduce同學上場,請大家歡呼、扔雞蛋,千萬別客氣:
          user=> (reduce + 0 '(1 2 3))
          6
            
              自然要reduce,總要指定規則怎么reduce,我們這里很簡單,就是個加法運算,再給初始值0就好咯,reduce萬歲。

              慢著,有個問題,partition返回的是集合的集合((1 2 3) (4 5 6) (7 8 9) (10 0)),而上面的reduce卻要作用在子集里,怎么辦?無敵的map大神出場了,map的作用是將某個函數作用于集合上,并且返回作用后的集合結果,這里要干的事情就是將上面的reduce作用在partition返回的集合的集合上面:

          user=> (map #(reduce + 0 % )(partition 3 3 [0'(1 2 3 4 5 6 7 8 9 10)))            
          (6 15 24 10)

              返回的是4個子集各自的和,答案肯定沒錯,最后一個結果不正是唯一的元素10嗎?這里可能比較費解的是#(reduce + 0 %),這其實定義了一個匿名函數,它接收一個參數,這個參數用百分號%默認指代,因為是將map作用于集合的集合,因此這里的%其實就是各個子集。

             map返回的是個集合,又要求集合的總和,是不是又該reduce上場了?不好意思,map同學,這次演出你就一跑龍套的:
          user=> (reduce + 0 (map #(reduce + 0 % )(partition 3 3 [0'(1 2 3 4 5 6 7 8 9 10))))
          55

              偉大的55出來了,它不是一個人在戰斗,這一刻LISP、Scheme、Erlang、Scala、Clojure、JavaScript靈魂附體,它繼承了FP的光榮傳統,干凈漂亮地解決了這個問題。

              綜合上面所述,我們給出一個非多核版本的解答:
          (defn mysum [coll n]
               (let [sub
          -colls   (partition n n [0] coll)
                     result
          -coll (map #(reduce + 0 %) sub-colls) ]
                   (reduce 
          + 0 result-coll)))
             
              我們是使用了let語句綁定變量,純粹是為了更好看懂一些。sub-colls綁定到partition返回的集合的集合,result-coll就是各個子集的結果組成的集合,#(reduce + 0 %)是個匿名函數,其中%指向匿名函數的第一個參數,也就是每個子集。最終,利用reduce將result-coll的結果綜合在一起。

              “我們要多核,我們要多核,我們不要西太平洋大學的野雞MapReduce"。

               臺下別激動,神奇的“多核”馬上出場,我要改動的代碼只是那么一點點,用pmap替代map
          (defn psum [coll n]
               (let [sub
          -colls   (partition n n [0] coll)
                     result
          -coll (pmap #(reduce + 0 %) sub-colls) ]
                   (reduce 
          + 0 result-coll)))

             完了嗎?真完了,你要改動的只有這么一點點,就可以讓切分出來的子集并發地計算了。(感謝網友@clojans的提醒)。

          以下是原文:
              首先是匿名函數改造一點點:
              我干嘛了,我就加了個future,給你個未來。嚴肅點地說,future將啟動一個單獨的線程去reduce子集。現在result-coll里不再是直接的結果,而是各個子集的Future對象,為了得到真正的和,你需要等待線程結束并取得結果,因此最后的reduce也要小小地改動下:
          (reduce #(+ %1 @%20 result-coll))

              reduce不再是簡單地用加號了,替代的則是一個兩個參數的匿名函數,第二個參數%2是Future對象,我們通過@操作符等待Future返回結果,并跟第一個參數%1(初始為0)作加法運算。

              最終的多核版本:
          (defn mysum2 [coll  n]
              (let [sub
          -colls   (partition n n [0] coll)
                    result
          -coll (map #(future (reduce + 0 %)) sub-colls)] 
                   (reduce #(
          + %1 @%20 result-coll)))

             這個多核版本跟非多核版本區別大嗎?不大嗎?大嗎?不大嗎?……
             可以看到,Clojure可以多么容易地在并發與非并發之間徘徊,習慣腳踏N只船了。


             

          posted @ 2010-07-15 11:19 dennis 閱讀(8101) | 評論 (10)編輯 收藏

              詳解clojure遞歸(上)
              詳解clojure遞歸(下)

              遞歸可以說是LISP的靈魂之一,通過遞歸可以簡潔地描述數學公式、函數調用,Clojure是LISP的方言,同樣需要遞歸來扮演重要作用。遞歸的價值在于可以讓你的思維以what的形式思考,而無需考慮how,你寫出來的代碼就是數學公式,就是函數的描述,一切顯得直觀和透明。如果你不習慣遞歸,那只是因為命令式語言的思維根深蒂固,如x=x+1這樣的表達式,從數學的角度來看完全不合法,但是在命令式語言里卻是合法的賦值語句。

             遞歸可以分為直接遞歸和間接遞歸,取決于函數是否直接或者間接地調用自身。如果函數的最后一個調用是遞歸調用,那么這樣的遞歸調用稱為尾遞歸,針對此類遞歸調用,編譯器可以作所謂的尾遞歸優化(TCO),因為遞歸調用是最后一個,因此函數的局部變量等沒有必要再保存,本次調用的結果可以完全作為參數傳遞給下一個遞歸調用,清空當前的棧并復用,那么就不需要為遞歸的函數調用保存一長串的棧,因此不會有棧溢出的問題。在Erlang、LISP這樣的FP語言里,都支持TCO,無論是直接遞歸或者間接遞歸。

             但是由于JVM自身的限制,Clojure和Scala一樣,僅支持直接的尾遞歸優化,將尾遞歸調用優化成循環語句。例如一個求階乘的例子:
             
          ;;第一個版本的階乘函數
          (defn fac [n]
                    (
          if (= 1 n)
                        
          1
                       (
          * n (fac (dec n)))))

             第一個版本的階乘并非尾遞歸,這是因為最后一個表達式的調用是一個乘法運算,而非(fac (dec n)),因此這個版本的階乘在計算大數的時候會導致棧溢出:
          user=> (fac 10000)
          java.lang.StackOverflowError (NO_SOURCE_FILE:
          0)

             將第一個版本改進一下,為了讓最后一個調用是遞歸調用,那么我們需要將結果作為參數來傳遞,而不是倚靠棧來保存,并且為了維持接口一樣,我們引入了一個內部函數fac0:
            
            ;;第二個版本,不是尾遞歸的“尾遞歸”
            (defn fac [n]
                     (defn fac0 [c r]
                        (
          if (= 0 c)
                            r
                            (fac0 (dec c) (
          * c r))))
                     (fac0 n 
          1))

             這個是第二個版本的階乘,通過將結果提取成參數來傳遞,就將fac0函數的遞歸調用修改為尾遞歸的形式,這是個尾遞歸嗎?這在Scala里,在LISP里,這都是尾遞歸,但是Clojure的TCO優化卻是要求使用recur這個特殊形式,而不能直接用函數名作遞歸調用,因此我們這個第二版本在計算大數的時候仍然將棧溢出:
          user=> (fac 10000)
          java.lang.StackOverflowError (NO_SOURCE_FILE:
          0)

             在Clojure里正確地TCO應該是什么樣子的呢?其實只要用recur在最后調用那一下替代fac0即可,這就形成我們第三個版本的階乘:
            ;;第三個版本,TCO起作用了
            (defn fac [n]
                     (defn fac0 [c r]
                        (
          if (= 0 c)
                            r
                            (recur (dec c) (
          * c r))))
                     (fac0 n 
          1))

              此時你再計算大數就沒有問題了,計算(fac 10000)可以正常運行(結果太長,我就不貼出來了)。recur只能跟函數或者loop結合在一起使用,只有函數和loop會形成遞歸點。我們第三個版本就是利用函數fac0做了尾遞歸調用的優化。
             
              loop跟let相似,只不過loop會在頂層形成一個遞歸點,以便recur重新綁定參數,使用loop改寫階乘函數,這時候就不需要定義內部函數了:
          ;;利用loop改寫的第四個版本的階乘函數
          (defn fac [n]
                     (loop [n n r 
          1]
                          (
          if (= n 0)
                              r
                              (recur (dec n) (
          * n r)))))

             loop初始的時候將n綁定為傳入的參數n(由于作用域不同,同名沒有問題),將r綁定為1,最后recur就可以將新的參數值綁定到loop的參數列表并遞歸調用。

             Clojure的TCO是怎么做到的,具體可以看看我前兩天寫的這篇博客,本質上是在編譯的時候將最后的遞歸調用轉化成一條goto語句跳轉到開始的Label,也就是轉變成了循環調用。

             這個階乘函數仍然有優化的空間,可以看到,每次計算其實都有部分是重復計算的,如計算(fac 5)也就是1*2*3*4*5,計算(fac 6)的1*2*3*4*5*6,如果能將前面的計算結果緩存下來,那么計算(fac 6)的時候將更快一些,這可以通過memoize函數來包裝階乘函數:
          ;;第五個版本的階乘,緩存中間結果
          (
          def fac (memoize fac))

          第一次計算(fac 10000)花費的時間長一些,因為還沒有緩存:
          user=> (time (fac 10000)) 
          "Elapsed time: 170.489622 msecs"


          第二次計算快了非常多(其實沒有計算,只是返回緩存結果):
          user=> (time (fac 10000))
          "Elapsed time: 0.058737 msecs"

              可以看到,如果沒有預先緩存,利用memoize包裝的階乘函數也是快不了。memoize的問題在于,計算(fac n)路徑上的沒有用到的值都不會緩存,它只緩存最終的結果,因此如果計算n前面的其他沒有計算過的數字,仍然需要重新計算。那么怎么保存路徑上的值呢?這可以將求階乘轉化成另一個等價問題來解決。
              我們可以將所有的階乘結果組織成一個無窮集合,求階乘變成從這個集合里取第n個元素,這是利用Clojure里集合是lazy的特性,集合里的元素如果沒有使用到,那么就不會預先計算,而是等待要用到的時候才計算出來,定義一個階乘結果的無窮集合,可以利用map將fac作用在整數集合上,map、reduce這樣的高階函數返回的是LazySeq:
           (def fac-seq (map fac (iterate inc 0)))

             (iterate inc 0)定義了正整數集合包括0,0的階乘沒有意義。這個集合的第0項其實是多余的。
             查看fac-seq的類型,這是一個LazySeq:
          user=> (class fac-seq)
          clojure.lang.LazySeq

            求n的階乘,等價于從這個集合里取第n個元素:
          user=> (nth fac-seq 10)
          3628800

            這個集合會比較耗內存,因為會緩存所有計算路徑上的獨立的值,哪怕他們暫時不會被用到。但是這種采用LazySeq的方式來定義階乘函數的方式有個優點,那就是在定義fac-seq使用的fac函數無需一定是符合TCO的函數,我們的第一個版本的階乘函數稍微修改下也可以使用,并且不會棧溢出:
          (defn fac [n]
                    (
          if (<= n 1)
                        
          1
                        (
          * n (fac (dec n)))))

          (
          def fac (memoize fac))
          (
          def fac-seq (map fac (iterate inc 0)))
          (nth fac
          -seq 10000)


            因為集合從0開始,因此只是修改了fac的if條件為n<=1的時候返回1。至于為什么這樣就不會棧溢出,有興趣的朋友可以自己思考下。

              從這個例子也可以看出,一些無法TCO的遞歸調用可以轉化為LazySeq來處理,這算是彌補JVM缺陷的一個辦法。
             


          posted @ 2010-07-14 22:03 dennis 閱讀(5256) | 評論 (4)編輯 收藏


          Clojure 的并發(一) Ref和STM
          Clojure 的并發(二)Write Skew分析
          Clojure 的并發(三)Atom、緩存和性能
          Clojure 的并發(四)Agent深入分析和Actor
          Clojure 的并發(五)binding和let
          Clojure的并發(六)Agent可以改進的地方
          Clojure的并發(七)pmap、pvalues和pcalls
          Clojure的并發(八)future、promise和線程

              Clojure處理并發的思路與眾不同,采用的是所謂STM的模型——軟事務內存。你可以將STM想象成數據庫,只不過是內存型的,它只支持事務的ACI,也就是原子性、一致性、隔離性,但是不包括持久性,因為狀態的保存都在內存里。

              Clojure的并發API分為四種模型:
          1、管理協作式、同步修改可變狀態的Ref
          2、管理非協作式、同步修改可變狀態的Atom
          3、管理異步修改可變狀態的Agent
          4、管理Thread local變量的Var。

              下面將對這四部分作更詳細的介紹。

          一、Ref和STM

           1、ref:

          通過ref函數創建一個可變的引用(reference),指向一個不可變的對象:
          (ref x)

          例子:創建一個歌曲集合:
          (def song (ref #{}))

          2、deref和@:
           取引用的內容,解引用使用deref函數
          (deref song)

          也可以用reader宏@:
          @song

          3、ref-set和dosync:


          改變引用指向的內容,使用ref-set函數
          (ref-set ref new-value)

          如,我們設置新的歌曲集合,加入一首歌:
          (ref-set song #{"Dangerous"})
          但是這樣會報錯:
          java.lang.IllegalStateException: No transaction running (NO_SOURCE_FILE:0)

          這是因為引用是可變的,對狀態的更新需要進行保護,傳統語言的話可能采用鎖,Clojure是采用事務,將更新包裝到事務里,這是通過dosync實現的:
          (dosync (ref-set song #{"Dangerous"}))

          dosync的參數接受多個表達式,這些表達式將被包裝在一個事務里,事務支持ACI:
          (1)Atomic,如果你在事務里更新多個Ref,那么這些更新對事務外部來說是一個獨立的操作。
          (2)Consistent,Ref的更新可以設置 validator,如果某個驗證失敗,整個事務將回滾。
          (3)Isolated,運行中的事務無法看到其他事務部分完成的結果。

          dosync更新多個Ref,假設我們還有個演唱者Ref,同時更新歌曲集合和演唱者集合:
          (def singer (ref #{}))
          (dosync (ref
          -set song #{"Dangerous"})
                         (ref
          -set singer #{"MJ"}) )

          @song      
          =>  #{"Dangerous"}
          @singer    
          =>  #{"MJ"}

          4、alter:
          完全更新整個引用的值還是比較少見,更常見的更新是根據當前狀態更新,例如我們向歌曲集合添加一個歌曲,步驟大概是先查詢集合內容,然后往集合里添加歌曲,然后更新整個集合:
          (dosync (ref-set song (conj @song "heal the world")))

          查詢并更新的操作可以合成一步,這是通過alter函數:
          (alter ref update-fn & args)

          alter接收一個更新的函數,函數將在更新的時候調用,傳入當前狀態值并返回新的狀態值,因此上面的例子可以改寫為:
           (dosync (alter song conj "heal the world"))

          這里使用conj而非cons是因為conj接收的第一個參數是集合,也就是當前狀態值,而cons要求第一個參數是將要加入的元素。

          5、commute:
            commute函數是alter的變形,commute顧名思義就是要求update-function是可交換的,它的順序是可以任意排序。commute的允許的并發程度比alter更高一些,因此性能會更好。但是由于commute要求update-function是可交換的,并且會自動重排序,因此如果你的更新要求順序性,那么commute是不能接受的,commute僅可用在對順序性沒有要求或者要求很低的場景:例如更新聊天窗口的聊天信息,由于網絡延遲的因素和個人介入的因素,聊天信息可以認為是天然排序,因此使用commute還可以接受,更新亂序的可能性很低。
            另一個例子就不能使用commute了,如實現一個計數器:
          (def counter (ref 0))

            實現一個next-counter函數獲取計數器的下一個值,我們先使用commute實現:
          (defn next-counter [] (dosync (commute counter inc)))

             這個函數很簡單,每次調用inc遞增counter的值,接下來寫個測試用例:啟動50個線程并發去獲取next counter:
          (dotimes [_ 50] (.start (Thread. #(println (next-counter)))))
            
             這段代碼稍微解釋下,dotimes是重復執行50次,每次啟動new并啟動一個Thread,這個Thread里干了兩件事情:調用next-counter,打印調用結果,第一個版本的next-counter執行下,這是其中一次輸出的截取:
          23
          23
          23

          23
          23
          23
          23
          23
          23
          23
          23
          23
          28
          23
          21
          23
          23
          23
          23
          25
          28

          可以看到有很多的重復數值,這是由于重排序導致事務結束后的值不同,但是你查看counter,確實是50:
          @counter  => 50

          證明更新是沒有問題的,問題出在commute的返回值上。

          如果將next-counter修改為alter實現:
          (defn next-counter [] (dosync (alter counter inc)))

          此時再執行測試用例,可以發現打印結果完全正確了:
          ……
          39
          41
          42
          45
          27
          46
          47
          44
          48
          43
          49
          40
          50

          查看counter,也是正確更新到50了:
          @counter => 50

          最佳實踐:通常情況下,你應該優先使用alter,除非在遇到明顯的性能瓶頸并且對順序不是那么關心的時候,可以考慮用commute替換。

          6、validator:
             類似數據庫,你也可以為Ref添加“約束”,在數據更新的時候需要通過validator函數的驗證,如果驗證不通過,整個事務將回滾。添加validator是通過ref函數傳入metadata的map實現的,例如我們要求歌曲集合添加的歌曲名稱不能為空:
          (def validate-song
               (partial every? #(not (nil?
          %))))

          (def song (ref #{} :validator validate
          -song))

          validate-song是一個驗證函數,partial返回某個函數的半函數(固定了部分參數,部分參數沒固定),你可以將partial理解成currying,雖然還是不同的。validate-song調用every?來驗證集合內的所有元素都不是nil,其中#(not (nil? %))是一個匿名函數,%指向匿名函數的第一個參數,也就是集合的每個元素。ref指定了validator為validate-song,那么在每次更新song集合的時候都會將新的狀態傳入validator函數里驗證一下,如果返回false,整個事務將回滾:

          (dosync (alter song conj nil))
          java.lang.IllegalStateException: Invalid reference state (NO_SOURCE_FILE:
          0)

          更新失敗,非法的reference狀態,查看song果然還是空的:
          @song => #{}

          更新正常的值就沒有問題:
           (dosync (alter song conj "dangerous"))   => #{"dangerous"}

             
          7、ensure:

            ensure函數是為了保護Ref不會被其他事務所修改,它的主要目的是為了防止所謂的“寫偏序”(write skew)問題。寫偏序問題的產生跟STM的實現有關,clojure的STM實現是基于MVCC(Multiversion Concurrency Control)——多版本并發控制,對一個Ref保存多個版本的狀態值,在更新的時候取得當前狀態值的一個隔離的snapshot,更新是基于snapshot進行的。那么我們來看下寫偏序是怎么產生,以一個比喻來描述:
            想象有一個系統用于管理美國最神秘的軍事禁區——51區的安全巡邏,你有3個營的士兵,每個營45個士兵,并且你需要保證總體巡邏的士兵人數不能少于100個人。假設有一天,有兩個指揮官都登錄了這個管理系統,他們都想從某個軍營里抽走20個士兵,假設指揮官A想從1號軍營抽走,指揮官B想要從2號軍營抽走士兵,他們同時執行下列操作:
          Admin 1if ((G1 - 20+ G2 + G3) > 100 then dispatchPatrol

          Admin 
          2if (G1 + (G2 - 20+ G3) > 100 then dispatchPatrol

          我們剛才提到,Clojure的更新是基于隔離的snapshot,一個事務的更改無法看到另一個事務更改了部分的結果,因此這兩個操作都因為滿足(45-20)+45+45=115的約束而得到執行,導致實際抽調走了40個士兵,只剩下95個士兵,低于設定的安全標準100人,這就是寫偏序現象。
            寫偏序的解決就很簡單,在執行抽調前加入ensure即可保護ref不被其他事務所修改。ensure比(ref-set ref @ref)允許的并發程度更高一些。


          Ref和STM的介紹暫時到這里,原理和源碼的解析要留待下一篇文章了。



          posted @ 2010-07-14 02:34 dennis 閱讀(7883) | 評論 (8)編輯 收藏


              解釋器求值的順序可以分為應用序和正則序,應用序是先求值參數,再執行表達式;正則序則是先將表達式按照實際參數展開,然后再執行。具體可以看看過去寫的這篇文章

             Clojure的求值可以肯定是應用序的,如執行
          (defn mytest [a b] 
                (
          if (= a 0)
                    a
                    b))
          (mytest 
          0 1/0)
                  

          盡管在(mytest 0 1/0)中a綁定為0,如果求值器是完全展開再求值,那應該正常執行并返回a,也就是1;但是因為clojure是應用序,因此參數b的1/0會先計算,這顯然會報錯。

             clojure的dosync用于將一些表達式包裝成事務,Ref的更新操作沒有包裝在事務里,會拋出異常
          ;;定義mutable的Ref
           (def song (ref #{}))

          ;;添加一首歌
          (alter song conj 
          "dangerous")

             alter用于向Ref查詢并添加元素,用conj將"dangerous"這首歌加入集合,但是alter要求執行在一個事務里,因此上面的代碼會報錯
          java.lang.IllegalStateException: No transaction running (NO_SOURCE_FILE:0)

             如果你用dosync包裝就沒有問題
          user=> (dosync (alter song conj "dangerous"))
          #{
          "dangerous"}

             返回更新后的結果集合。這個跟我們要談的正則序和應用序有什么關系呢?可能你看出來了,如果說clojure是應用序,那么在表達式 (dosync (alter song conj "dangerous"))中,alter也應該先執行,應當照樣報" No transaction running"的錯誤才對,為何卻沒有呢?難道dosync是按照正則序執行?

             查看dosync的文檔
          user=> (doc dosync)
          -------------------------
          clojure.core
          /dosync
          ([
          & exprs])
          Macro
            Runs the exprs (in an implicit 
          do) in a transaction that encompasses
            exprs and any nested calls.  Starts a transaction 
          if none is already
            running on 
          this thread. Any uncaught exception will abort the
            transaction and flow out of dosync. The exprs may be run more than
            once, but any effects on Refs will be atomic.

             這是一個宏,他的作用是將表達式包裝在一個事務里,如果當前線程沒有事務,那么就啟動一個。
          查看源碼:

          (defmacro dosync
            
          "Runs the exprs (in an implicit do) in a transaction that encompasses
            exprs and any nested calls.  Starts a transaction if none is already
            running on 
          this thread. Any uncaught exception will abort the
            transaction and flow out of dosync. The exprs may be run more than
            once, but any effects on Refs will be atomic.
          "
            [& exprs]
            `(sync nil 
          ~@exprs))

             本質上dosync是調用了sync這個宏,sync干了些什么?
          (defmacro sync
            
          "transaction-flags => TBD, pass nil for now

            Runs the exprs (in an implicit 
          do) in a transaction that encompasses
            exprs and any nested calls.  Starts a transaction 
          if none is already
            running on 
          this thread. Any uncaught exception will abort the
            transaction and flow out of sync. The exprs may be run more than
            once, but any effects on Refs will be atomic.
          "
            [flags-ignored-for-now & body]
            `(. clojure.lang.LockingTransaction
                (runInTransaction (fn [] 
          ~@body))))

             找到了,原來是調用了clojure.lang.LockingTransaction.runInTransaction這個靜態方法,并且將exps包裝成一個匿名函數

          fn [] ~@body

               因此,dosync并非正則序,dosync是個宏,(dosync (alter song conj "dangerous"))展開之后,其實是
          (sync nil (fun [] (alter song conj "dangerous")))
             
               這就解答了為什么(dosync (alter song conj "dangerous"))可以正常運行的疑問。宏的使用,首先是展開,然后才是按照應用序的順序求值。


            




          posted @ 2010-07-13 12:02 dennis 閱讀(2519) | 評論 (2)編輯 收藏

              這題目起的嘩眾取寵,其實只是想介紹下怎么查看Clojure動態生成的字節碼,這對分析Clojure的內部實現很重要。

              第一步,下載最新的Clojure 1.1.0源碼并解壓,并導入到你喜歡的IDE。

              其次,下載asm 3.0的源碼并解壓。

              第三,刪除Clojure 1.1.0源碼中的clojure.asm包。clojure并不是引用asm的jar包,而是將asm的源碼合并到clojure中,并且刪除一些只會在調試階段用到的package和class,保留使用asm的最小源碼集合,這可能是處于防止asm不同版本的jar包沖突以及縮小clojure大小的考慮。

              第四,將asm 3.0源碼拷入clojure的源碼中,并將包org.objectweb.asm包括子包整體重名名為clojure.asm。

              第五步,修改Clojure源碼,加入TraceClassVisitor的適配器,用于跟蹤字節碼生成,這需要修改clojure.lang.Compiler類中的兩個compile方法,找到類似
          ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_MAXS);
          // ClassWriter cw = new ClassWriter(0);
          ClassVisitor cv = cw;

          這樣的代碼,將cv修改為TraceClassVisitor:
           ClassVisitor cv = new TraceClassVisitor(new CheckClassAdapter(cw), new PrintWriter(System.out));

              TraceClassVisitor的第二個參數指定將跟蹤到的字節碼輸出到哪里,這里簡單地輸出到標準輸出方便查看。

              第六步,接下來可以嘗試下我們修改過的clojure怎么動態生成字節碼,啟動REPL,
          java clojure.main

          啟動階段就會輸出一些字節碼信息,主要預先加載的一些標準庫函數,如clojure.core中的函數等,REPL啟動完畢,隨便輸入一個表達式都將看到生成的字節碼
          user=> (+ 1 2)

          輸出類似

          compile 1
          // class version 49.0 (49)
          // access flags 33
          public class user$eval__4346 extends clojure/lang/AFunction  {

            
          // compiled from: NO_SOURCE_FILE
            
          // debug info: SMAP
          eval__4346.java
          Clojure
          *S Clojure
          *F
          + 1 NO_SOURCE_FILE
          NO_SOURCE_PATH
          *L
          0#1,1:0
          *E

            
          // access flags 25
            public final static Lclojure/lang/Var; const__0

            
          // access flags 25
            public final static Ljava/lang/Object; const__1

            
          // access flags 25
            public final static Ljava/lang/Object; const__2

            
          // access flags 9
            public static <clinit>()V
             L0
              LINENUMBER 
          2 L0
              LDC 
          "clojure.core"
              LDC 
          "+"
              INVOKESTATIC clojure
          /lang/RT.var (Ljava/lang/String;Ljava/lang/String;)Lclojure/lang/Var;
              CHECKCAST clojure
          /lang/Var
              PUTSTATIC user$eval__4346.const__0 : Lclojure
          /lang/Var;
              ICONST_1
              INVOKESTATIC java
          /lang/Integer.valueOf (I)Ljava/lang/Integer;
              PUTSTATIC user$eval__4346.const__1 : Ljava
          /lang/Object;
              ICONST_2
              INVOKESTATIC java
          /lang/Integer.valueOf (I)Ljava/lang/Integer;
              PUTSTATIC user$eval__4346.const__2 : Ljava
          /lang/Object;
              RETURN
              MAXSTACK 
          = 0
              MAXLOCALS 
          = 0

            
          // access flags 1
            public <init>()V
             L0
              LINENUMBER 
          2 L0
             L1
              ALOAD 
          0
              INVOKESPECIAL clojure
          /lang/AFunction.<init> ()V
             L2
              RETURN
              MAXSTACK 
          = 0
              MAXLOCALS 
          = 0

            
          // access flags 1
            public invoke()Ljava/lang/Object; throws java/lang/Exception 
             L0
              LINENUMBER 
          2 L0
             L1
              LINENUMBER 
          2 L1
              GETSTATIC user$eval__4346.const__1 : Ljava/lang/Object;
              GETSTATIC user$eval__4346.const__2 : Ljava
          /lang/Object;
              INVOKESTATIC clojure
          /lang/Numbers.add (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Number;

             L2
              LOCALVARIABLE 
          this Ljava/lang/Object; L0 L2 0
              ARETURN
              MAXSTACK 
          = 0
              MAXLOCALS 
          = 0
          }
          3

          3就是表達式的結果。可以看到,一個表達式生成了一個class。其中<clinit>是靜態初始化塊,主要是初始化表達式中的字面常量;<init>不用說,默認的構造函數;invoke是核心方法,表達式生成的class,new一個實例后調用的就是invoke方法,執行實際的代碼,高亮部分加載了兩個常量,并執行Number.add方法。

          最后,請Happy hacking!。



          posted @ 2010-07-11 12:07 dennis 閱讀(3613) | 評論 (1)編輯 收藏


              Clojure由于是基于JVM,同樣無法支持完全的尾遞歸優化(TCO),這主要是Java的安全模型決定的,可以看看這個久遠的bug描述。但是Clojure和Scala一樣支持同一個函數的直接調用的尾遞歸優化,也就是同一個函數在函數體的最后調用自身,會優化成循環語句。讓我們看看這是怎么實現的。
              Clojure的recur的特殊形式(special form)就是用于支持這個優化,讓我們看一個例子,經典的求斐波那契數:
          (defn recur-fibo [n]
               (letfn [(fib 
                          [current next n]
                          (
          if (zero? n)
                              current
                              ;recur將遞歸調用fib函數
                              (recur next (
          + current next) (dec n))))]
              (fib 
          0 1 n)))

              recur-fibo這個函數的內部定義了一個fib函數,fib函數的實現就是斐波那契數的定義,fib函數的三個參數分別是當前的斐波那契數(current)、下一個斐波那契數(next)、計數器(n),當計數器為0的時候返回當前的斐波那契數字,否則就將當前的斐波那契數設置為下一個,下一個斐波那契數字等于兩者之和,計數遞減并遞歸調用fib函數。注意,你這里不能直接調用(fib next (+ current next) (dec n)),否則仍將棧溢出。這跟Scala不同,Clojure是用recur關鍵字而非原函數名作TOC優化。

              Clojure是利用asm 3.0作字節碼生成,觀察下recur-fibo生成的字節碼會發現它其實生成了兩個類,類似user$recur_fibo__4346$fib__4348和user$recur_fibo__4346,user是namespace,前一個是recur-fibo中的fib函數的實現,后一個則是recur-fibo自身,這兩個類都繼承自 clojure.lang.AFunction類,值得一提的是前一個類是后一個類的內部類,這跟函數定義相吻合。所有的用戶定義的函數都將繼承 clojure.lang.AFunction。
             
              在這兩個類中都有一個invoke方法,用于實際的方法執行,讓我們看看內部類fib的invoke方法(忽略了一些旁枝末節)
           1 // access flags 1
           2   public invoke(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object; throws java/lang/Exception 
           3    L0
           4     LINENUMBER 2 L0
           5    L1
           6     LINENUMBER 4 L1
           7    L2
           8     LINENUMBER 4 L2
           9     ALOAD 3
          10     INVOKESTATIC clojure/lang/Numbers.isZero (Ljava/lang/Object;)Z

          11     IFEQ L3
          12     ALOAD 1
          13     GOTO L4

          14    L5
          15     POP
          16    L3
          17     ALOAD 2
          18    L6
          19     LINENUMBER 6 L6
          20     ALOAD 1
          21     ALOAD 2
          22     INVOKESTATIC clojure/lang/Numbers.add (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Number;

          23    L7
          24     LINENUMBER 6 L7
          25     ALOAD 3
          26     INVOKESTATIC clojure/lang/Numbers.dec (Ljava/lang/Object;)Ljava/lang/Number;

          27     ASTORE 3
          28     ASTORE 2
          29     ASTORE 1
          30     GOTO L0

          31    L4
          32    L8
          33     LOCALVARIABLE this Ljava/lang/Object; L0 L8 0
          34     LOCALVARIABLE current Ljava/lang/Object; L0 L8 1
          35     LOCALVARIABLE next Ljava/lang/Object; L0 L8 2
          36     LOCALVARIABLE n Ljava/lang/Object; L0 L8 3
          37     ARETURN
          38     MAXSTACK = 0
          39     MAXLOCALS = 0

              首先看方法簽名,invoke接收三個參數,都是Object類型,對應fib函數里的current、next和n。
              關鍵指令都已經加亮,9——11行,加載n這個參數,利用Number.isZero判斷n是否為0,如果為0,將1彈入堆,否則彈入0。IFEQ比較棧頂是否為0,為0(也就是n不為0)就跳轉到L3,否則繼續執行(n為0,加載參數1,也就是current,然后跳轉到L4,最后通過ARETURN返回值current作結果。
             
              指令20——22行,加載current和next,執行相加操作,生成下一個斐波那契數。
              指令25-——26行,加載n并遞減。
              指令27——29行,將本次計算的結果存儲到local變量區,覆蓋了原有的值。
              指令30行,跳轉到L0,重新開始執行fib函數,此時local變量區的參數值已經是上一次執行的結果。
             
              有的朋友可能要問,為什么加載current是用aload 1,而不是aload 0,處在0位置上的是什么?0位置上存儲的就是著名的this指針,invoke是實例方法,第一個參數一定是this。

             從上面的分析可以看到,recur干的事情就兩件:覆蓋原有的local變量,以及跳轉到函數開頭執行循環操作,這就是所謂的軟尾遞歸優化。這從RecurExp的實現也可以看出來:

             //覆蓋變量
            for (int i = loopLocals.count() - 1; i >= 0; i--) {
                          LocalBinding lb 
          = (LocalBinding) loopLocals.nth(i);
                          Class primc 
          = lb.getPrimitiveType();
                          
          if (primc != null) {
                              gen.visitVarInsn(Type.getType(primc).getOpcode(Opcodes.ISTORE), lb.idx);
                          }
                          
          else {
                              gen.visitVarInsn(OBJECT_TYPE.getOpcode(Opcodes.ISTORE), lb.idx);
                          }
                      }
             
          //執行跳轉
             gen.goTo(loopLabel);
           
                recur分析完了,最后有興趣可以看下recur-fibo的invoke字節碼
           1  L0
           2     LINENUMBER 1 L0
           3     ACONST_NULL
           4     ASTORE 2
           5     NEW user$recur_fibo__4346$fib__4348
           6     DUP
           7     INVOKESPECIAL user$recur_fibo__4346$fib__4348.<init> ()V

           8     ASTORE 2
           9     ALOAD 2
          10     CHECKCAST user$recur_fibo__4346$fib__4348
          11     POP
          12    L1
          13    L2
          14     LINENUMBER 7 L2
          15     ALOAD 2
          16     CHECKCAST clojure/lang/IFn
          17     GETSTATIC user$recur_fibo__4346.const__2 : Ljava/lang/Object;
          18     GETSTATIC user$recur_fibo__4346.const__3 : Ljava/lang/Object;
          19     ALOAD 1
          20     ACONST_NULL
          21     ASTORE 1
          22     ACONST_NULL
          23     ASTORE 2
          24     INVOKEINTERFACE clojure/lang/IFn.invoke (Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
          25    L3
          26     LOCALVARIABLE fib Ljava/lang/Object; L1 L3 2
          27    L4
          28     LOCALVARIABLE this Ljava/lang/Object; L0 L4 0
          29     LOCALVARIABLE n Ljava/lang/Object; L0 L4 1
          30     ARETURN

               5——7行,實例化一個內部的fib函數。
               24行,調用fib對象的invoke方法,傳入3個初始參數。

               簡單來說,recur-fibo生成的對象里只是new了一個fib生成的對象,然后調用它的invoke方法,這也揭示了Clojure的內部函數的實現機制。
                

          posted @ 2010-07-11 04:20 dennis 閱讀(4265) | 評論 (0)編輯 收藏

             托爾多正式宣布退役了,下個賽季他將會留在國際米蘭做青訓方面的工作,他在2000年歐洲杯上的經典表現將永載史冊。他為國際米蘭效力了十年,這也是我看國米比賽的十年。



          posted @ 2010-07-10 00:10 dennis 閱讀(728) | 評論 (0)編輯 收藏

          僅列出標題
          共56頁: First 上一頁 7 8 9 10 11 12 13 14 15 下一頁 Last 
          主站蜘蛛池模板: 怀来县| 平乡县| 错那县| 佳木斯市| 枣阳市| 阿克| 从江县| 阿巴嘎旗| 香格里拉县| 平凉市| 敖汉旗| 湘西| 枝江市| 龙陵县| 新密市| 贵定县| 武义县| 永城市| 永仁县| 巫溪县| 周至县| 龙门县| 三台县| 桂阳县| 枝江市| 高唐县| 霸州市| 濮阳县| 咸丰县| 宁城县| 炉霍县| 浦东新区| 安乡县| 北宁市| 贡嘎县| 柳河县| 沂南县| 固阳县| 望江县| 江西省| 固镇县|