Big Data Road

          Clojure DSL

               Storm 用Clojure DSL 來定義 spouts, bolts, and topologies。 Clojure的DSL訪問的任何公開的Java API,如果你是一個Clojure的用戶,你編寫直接用Clojure 編寫Storm 的Topologies,而不用接觸Java。 Clojure的DSL定義在backtype.storm.clojure命名空間里。
                 本頁面概述了Clojure的DSL的所有細節,包括:
                  1.定義Topology(拓撲結構)
                  2.定義bolt
                  3.定義spout
                  4.在本地模式下或在集群模式下運行Topology
                  5.測試Topology
                

          定義 topologies

                 要定義Topology, 需要使用Topology函數。Topology需要兩個參數:一個關于“Spou Specs”的映射和一個關于“Bolt Spec”的映射。每個Spout和Bolt指定組件到Topology上,如輸入和并行拓撲結構的代碼。
                讓我們來看看在Storm啟動項目的例子的拓撲定義
          (topology
           {"1" (spout-spec sentence-spout)
            "2" (spout-spec (sentence-spout-parameterized
                             ["the cat jumped over the door"
                              "greetings from a faraway land"])
                             :p 2)}
           {"3" (bolt-spec {"1" :shuffle "2" :shuffle}
                           split-sentence
                           :p 5)
            "4" (bolt-spec {"3" ["word"]}
                           word-count
                           :p 6)})
          映射 Spout 和Bolt Spces 都是從組件ID到Correponding Spec的映射。組件ID必須在映射間唯一。就像在Java中定義Topology一樣,在一個Topology里,在申明bolts的輸入時,組件ID將用到。

          spout-spec

          spout-spec 作為Spout實現的參數和可選的關鍵字參數使用 。 目前唯一的可選參數是:P, 這個用來定義Spout的并行度。如果你忽略 :p, spout將會作為單一任務執行。

          bolt-spec

          bolt-spec作為bolt的輸入聲明參數和可選的關鍵字參數使用 。輸入聲明是數據流ID到數據流組的一個映射。數據流ID可以用以下兩種形式中的一種:

           

          1. [==component id== ==stream id==]: 在組件上訂閱指定流
          2. ==component id==: 在組件上訂閱默認流

          數據流組可以是以下中的一個

          1. :shuffle: 訂閱shuffle組
          2. 字段名稱的向量, like ["id" "name"]: 訂閱指定字段上的字段組
          3. :global: 訂閱一個 global grouping
          4. :all: subscribes with an all grouping
          5. :direct: subscribes with a direct grouping

          可以參考 Concepts 獲得更多關于流組的信息. 這里有一個示例來展示不同的方法來聲明輸入:

          {["2" "1"] :shuffle  "3" ["field1" "field2"]  ["4" "2"] :global} 
          輸入聲明總共訂閱三種流。他在組件“2”上定義流“1”,是Shuffle分組方式。在組件"3"上訂閱默認的流,是Fileds分組方式,分組標準是"Field1"和"Field2"。在組件4上定義流“2”,是Global分組方式,
          跟Spout-Spec 方式類似,bolt-spec目前唯一支持的關鍵參數是:p,這個用來定義bolt的并行度。

          shell-bolt-spec

          shell-bolt-spec是用在non-JVM語言環境下來實現bolts。他作為參數輸入,命令行程序去跑。the name of the file implementing the bolt, an output specification, and then the same keyword arguments that bolt-spec accepts.

          以下是 shell-bolt-spec的一個示例:

          (shell-bolt-spec {"1" :shuffle "2" ["id"]}  "python"  "mybolt.py"  ["outfield1" "outfield2"]  :p 25) 

           

          輸出聲明的語法是在下面的defbolt部分詳細描述。有如何在Storm上使用multilang的更多細節,請參閱使用非JVM語言

           

          defbolt

          defbolt is used for defining bolts in Clojure. Bolts have the constraint that they must be serializable, and this is why you can't just reify IRichBolt to implement a bolt (closures aren't serializable). defbolt works around this restriction and provides a nicer syntax for defining bolts than just implementing a Java interface.

          At its fullest expressiveness, defbolt supports parameterized bolts and maintaining state in a closure around the bolt implementation. It also provides shortcuts for defining bolts that don't need this extra functionality. The signature for defbolt looks like the following:

          (defbolt name output-declaration *option-map & impl)

          Omitting the option map is equivalent to having an option map of {:prepare false}.

          Simple bolts

          Let's start with the simplest form of defbolt. Here's an example bolt that splits a tuple containing a sentence into a tuple for each word:

          (defbolt split-sentence ["word"] [tuple collector]  (let [words (.split (.getString tuple 0) " ")]  (doseq [w words]  (emit-bolt! collector [w] :anchor tuple))  (ack! collector tuple)  )) 

          Since the option map is omitted, this is a non-prepared bolt. The DSL simply expects an implementation for the execute method of IRichBolt. The implementation takes two parameters, the tuple and the OutputCollector, and is followed by the body of the execute function. The DSL automatically type-hints the parameters for you so you don't need to worry about reflection if you use Java interop.

          This implementation binds split-sentence to an actual IRichBolt object that you can use in topologies, like so:

          (bolt-spec {"1" :shuffle}  split-sentence  :p 5) 

          Parameterized bolts

          Many times you want to parameterize your bolts with other arguments. For example, let's say you wanted to have a bolt that appends a suffix to every input string it receives, and you want that suffix to be set at runtime. You do this with defbolt by including a :params option in the option map, like so:

          (defbolt suffix-appender ["word"] {:params [suffix]}  [tuple collector]  (emit-bolt! collector [(str (.getString tuple 0) suffix)] :anchor tuple)  ) 

          Unlike the previous example, suffix-appender will be bound to a function that returns an IRichBolt rather than be an IRichBolt object directly. This is caused by specifying :params in its option map. So to use suffix-appender in a topology, you would do something like:

          (bolt-spec {"1" :shuffle}  (suffix-appender "-suffix")  :p 10) 

          Prepared bolts

          To do more complex bolts, such as ones that do joins and streaming aggregations, the bolt needs to store state. You can do this by creating a prepared bolt which is specified by including {:prepare true} in the option map. Consider, for example, this bolt that implements word counting:

          (defbolt word-count ["word" "count"] {:prepare true}  [conf context collector]  (let [counts (atom {})]  (bolt  (execute [tuple]  (let [word (.getString tuple 0)]  (swap! counts (partial merge-with +) {word 1})  (emit-bolt! collector [word (@counts word)] :anchor tuple)  (ack! collector tuple)  ))))) 

          The implementation for a prepared bolt is a function that takes as input the topology config, TopologyContext, and OutputCollector, and returns an implementation of the IBolt interface. This design allows you to have a closure around the implementation of execute and cleanup.

          In this example, the word counts are stored in the closure in a map called counts. The bolt macro is used to create the IBolt implementation. The bolt macro is a more concise way to implement the interface than reifying, and it automatically type-hints all of the method parameters. This bolt implements the execute method which updates the count in the map and emits the new word count.

          Note that the execute method in prepared bolts only takes as input the tuple since the OutputCollector is already in the closure of the function (for simple bolts the collector is a second parameter to the execute function).

          Prepared bolts can be parameterized just like simple bolts.

          Output declarations

          The Clojure DSL has a concise syntax for declaring the outputs of a bolt. The most general way to declare the outputs is as a map from stream id a stream spec. For example:

          {"1" ["field1" "field2"]  "2" (direct-stream ["f1" "f2" "f3"])  "3" ["f1"]} 

          The stream id is a string, while the stream spec is either a vector of fields or a vector of fields wrapped by direct-stream. direct stream marks the stream as a direct stream (See Concepts and Direct groupings for more details on direct streams).

          If the bolt only has one output stream, you can define the default stream of the bolt by using a vector instead of a map for the output declaration. For example:

          ["word" "count"] 
          This declares the output of the bolt as the fields ["word" "count"] on the default stream id.

          Emitting, acking, and failing

          Rather than use the Java methods on OutputCollector directly, the DSL provides a nicer set of functions for using OutputCollector: emit-bolt!, emit-direct-bolt!, ack!, and fail!.

          1. emit-bolt!: takes as parameters the OutputCollector, the values to emit (a Clojure sequence), and keyword arguments for :anchor and :stream. :anchor can be a single tuple or a list of tuples, and :stream is the id of the stream to emit to. Omitting the keyword arguments emits an unanchored tuple to the default stream.
          2. emit-direct-bolt!: takes as parameters the OutputCollector, the task id to send the tuple to, the values to emit, and keyword arguments for :anchor and :stream. This function can only emit to streams declared as direct streams.
          3. ack!: takes as parameters the OutputCollector and the tuple to ack.
          4. fail!: takes as parameters the OutputCollector and the tuple to fail.

          See Guaranteeing message processing for more info on acking and anchoring.

          defspout

          defspout is used for defining spouts in Clojure. Like bolts, spouts must be serializable so you can't just reify IRichSpout to do spout implementations in Clojure. defspout works around this restriction and provides a nicer syntax for defining spouts than just implementing a Java interface.

          The signature for defspout looks like the following:

          (defspout name output-declaration *option-map & impl)

          If you leave out the option map, it defaults to {:prepare true}. The output declaration for defspout has the same syntax as defbolt.

          Here's an example defspout implementation from storm-starter:

          (defspout sentence-spout ["sentence"]  [conf context collector]  (let [sentences ["a little brown dog"  "the man petted the dog"  "four score and seven years ago"  "an apple a day keeps the doctor away"]]  (spout  (nextTuple []  (Thread/sleep 100)  (emit-spout! collector [(rand-nth sentences)])   )  (ack [id]  ;; You only need to define this method for reliable spouts  ;; (such as one that reads off of a queue like Kestrel)  ;; This is an unreliable spout, so it does nothing here  )))) 

          The implementation takes in as input the topology config, TopologyContext, and SpoutOutputCollector. The implementation returns an ISpout object. Here, the nextTuple function emits a random sentence from sentences.

          This spout isn't reliable, so the ack and fail methods will never be called. A reliable spout will add a message id when emitting tuples, and then ack or fail will be called when the tuple is completed or failed respectively. See Guaranteeing message processing for more info on how reliability works within Storm.

          emit-spout! takes in as parameters the SpoutOutputCollector and the new tuple to be emitted, and accepts as keyword arguments :stream and :id. :stream specifies the stream to emit to, and :id specifies a message id for the tuple (used in the ack and fail callbacks). Omitting these arguments emits an unanchored tuple to the default output stream.

          There is also a emit-direct-spout! function that emits a tuple to a direct stream and takes an additional argument as the second parameter of the task id to send the tuple to.

          Spouts can be parameterized just like bolts, in which case the symbol is bound to a function returning IRichSpout instead of the IRichSpout itself. You can also declare an unprepared spout which only defines the nextTuple method. Here is an example of an unprepared spout that emits random sentences parameterized at runtime:

          (defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false}  [collector]  (Thread/sleep 500)  (emit-spout! collector [(rand-nth sentences)])) 

          The following example illustrates how to use this spout in a spout-spec:

          (spout-spec (sentence-spout-parameterized  ["the cat jumped over the door"  "greetings from a faraway land"])  :p 2) 

          Running topologies in local mode or on a cluster

          That's all there is to the Clojure DSL. To submit topologies in remote mode or local mode, just use the StormSubmitter or LocalCluster classes just like you would from Java.

          To create topology configs, it's easiest to use the backtype.storm.config namespace which defines constants for all of the possible configs. The constants are the same as the static constants in the Config class, except with dashes instead of underscores. For example, here's a topology config that sets the number of workers to 15 and configures the topology in debug mode:

          {TOPOLOGY-DEBUG true  TOPOLOGY-WORKERS 15} 

          Testing topologies

          This blog post and its follow-up give a good overview of Storm's powerful built-in facilities for testing topologies in Clojure.


          posted on 2012-01-19 15:38 徐紅星 閱讀(436) 評論(0)  編輯  收藏 所屬分類: Storm


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           

          My Links

          Blog Stats

          留言簿

          隨筆分類

          文章分類

          文章檔案

          搜索

          最新評論

          主站蜘蛛池模板: 西盟| 五指山市| 文水县| 浏阳市| 绥芬河市| 微山县| 蕲春县| 蒙自县| 将乐县| 慈利县| 专栏| 玛纳斯县| 浦江县| 耒阳市| 高雄县| 巫山县| 门源| 扎兰屯市| 明光市| 焉耆| 郸城县| 社旗县| 滨海县| 嘉兴市| 宝坻区| 玉门市| 盐池县| 郓城县| 怀仁县| 昭平县| 连云港市| 德江县| 罗山县| 都兰县| 彩票| 芦溪县| 武宣县| 尖扎县| 宁城县| 博野县| 泽普县|