莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          Clojure的并發(七)pmap、pvalues和pcalls

          Posted on 2010-08-04 23:54 dennis 閱讀(3470) 評論(0)  編輯  收藏 所屬分類: Clojure
          Clojure 的并發(一) Ref和STM
          Clojure 的并發(二)Write Skew分析
          Clojure 的并發(三)Atom、緩存和性能
          Clojure 的并發(四)Agent深入分析和Actor
          Clojure 的并發(五)binding和let
          Clojure的并發(六)Agent可以改進的地方
          Clojure的并發(七)pmap、pvalues和pcalls
          Clojure的并發(八)future、promise和線程

          七、并發函數pmap、pvalues和pcalls

           1、pmap是map的進化版本,map將function依次作用于集合的每個元素,pmap也是這樣,但是它對于每個集合中的元素都是提交給一個線程去執行function,也就是并行地對集合里的元素執行指定的函數。通過一個例子來解釋下。我們先定義一個make-heavy函數用于延時執行某個函數:
          (defn make-heavy [f]
                  (fn [
          & args]
                      (Thread
          /sleep 1000)
                      (apply f args)))

          make-heavy接受一個函數f作為參數,返回一個新的函數,它延時一秒才實際執行f。我們利用make-heavy包裝inc,然后執行下map:

          user=> (time (doall (map (make-heavy inc) [1 2 3 4 5])))
          "Elapsed time: 5005.115601 msecs"
          (
          2 3 4 5 6)

          可以看到總共執行了5秒,這是因為map依次將包裝后的inc作用在每個元素上,每次調用都延時一秒,總共5個元素,因此延時了5秒左右。這里使用doall,是為了強制map返回的lazy-seq馬上執行。

          如果我們使用pmap替代map的話:
          user=> (time (doall (pmap (make-heavy inc) [1 2 3 4 5])))
          "Elapsed time: 1001.146444 msecs"
          (
          2 3 4 5 6)

          果然快了很多,只用了1秒多,顯然pmap并行地將make-heavy包裝后的inc作用在集合的5個元素上,總耗時就接近于于單個調用的耗時,也就是一秒。


          2、pvalues和pcalls是在pmap之上的封裝,pvalues是并行地執行多個表達式并返回執行結果組成的LazySeq,pcalls則是并行地調用多個無參數的函數并返回調用結果組成的LazySeq。

          user=> (pvalues (+ 1 2) (- 1 2) (* 1 2) (/ 1 2))
          (
          3 -1 2 1/2)

          user=> (pcalls #(println "hello") #(println "world"))
          hello
          world
          (nil nil)

          3、pmap的并行,從實現上來說,是集合有多少個元素就使用多少個線程:
           1 (defn pmap
           2   {:added "1.0"}
           3   ([f coll]
           4    (let [n (+ 2 (.. Runtime getRuntime availableProcessors))
           5          rets (map #(future (f %)) coll)
           6          step (fn step [[x & xs :as vs] fs]
           7                 (lazy-seq
           8                  (if-let [s (seq fs)]
           9                    (cons (deref x) (step xs (rest s)))
          10                    (map deref vs))))]
          11      (step rets (drop n rets))))
          12   ([f coll & colls]
          13    (let [step (fn step [cs]
          14                 (lazy-seq
          15                  (let [ss (map seq cs)]
          16                    (when (every? identity ss)
          17                      (cons (map first ss) (step (map rest ss)))))))]
          18      (pmap #(apply f %) (step (cons coll colls))))))

          在第5行,利用map和future將函數f作用在集合的每個元素上,future是將函數f(實現callable接口)提交給Agent的CachedThreadPool處理,跟agent的send-off共用線程池

          但是由于有chunked-sequence的存在,實際上調用的線程數不會超過chunked的大小,也就是32。事實上,pmap啟動多少個線程取決于集合的類型,對于chunked-sequence,是以32個元素為單位來批量執行,通過下面的測試可以看出來,range返回的是一個chunked-sequence,clojure 1.1引入了chunked-sequence,目前那些返回LazySeq的函數如map、filter、keep等都是返回chunked-sequence:

          user=> (time (doall (pmap (make-heavy inc) (range 0 32))))
          "Elapsed time: 1003.372366 msecs"
          (
          1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32)

          user
          => (time (doall (pmap (make-heavy inc) (range 0 64))))
          "Elapsed time: 2008.153617 msecs"
          (
          1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64)

          可以看到,對于32個元素,執行(make-heavy inc)耗費了一秒左右;對于64個元素,總耗時是2秒,這可以證明64個元素是分為兩個批次并行執行,一批32個元素,啟動32個線程(可以通過jstack查看)。


          并且pmap的執行是半延時的(semi-lazy),前面的總數-(cpus+2)個元素是一個一個deref(future通過deref來阻塞獲取結果),后cpus+2個元素則是一次性調用map執行deref。

          4、pmap的適用場景取決于將集合分解并提交給線程池并行執行的代價是否低于函數f執行的代價,如果函數f的執行代價很低,那么將集合分解并提交線程的代價可能超過了帶來的好處,pmap就不一定能帶來性能的提升。pmap只適合那些計算密集型的函數f,計算的耗時超過了協調的代價。

          5、關于chunked-sequence可以看看這篇報道,也可以參考Rich Hickey的PPT。chunk sequence的思路類似批量處理來提高系統的吞吐量。


          主站蜘蛛池模板: 三穗县| 景德镇市| 景东| 赤峰市| 靖安县| 铜川市| 涞源县| 渝北区| 张北县| 青田县| 普兰店市| 祁东县| 潜江市| 普安县| 博白县| 许昌县| 怀化市| 当涂县| 花莲县| 新津县| 同江市| 锦屏县| 南阳市| 泽普县| 团风县| 巴中市| 肃宁县| 绥江县| 休宁县| 资阳市| 巴楚县| 北宁市| 贺州市| 随州市| 纳雍县| 广安市| 甘洛县| 永登县| 仪陇县| 海丰县| 阜南县|