xylz,imxylz

          關(guān)注后端架構(gòu)、中間件、分布式和并發(fā)編程

             :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            111 隨筆 :: 10 文章 :: 2680 評(píng)論 :: 0 Trackbacks
          Jafka 是一個(gè)開源的/性能良好的分布式消息系統(tǒng)。在上一篇文章中有所簡單介紹。
          下面是一篇簡單的入門文檔。更多詳細(xì)的文檔參考wiki。

          Step 1: 下載最新的安裝包

          完整的安裝指南在這里。
          最新的發(fā)行版地址在:https://github.com/adyliu/jafka/downloads

          $wget https://github.com/downloads/adyliu/jafka/jafka-1.0.tgz 
          $tar xzf jafka-1.0.tgz
          $cd jafka-1.0

          可選配置,設(shè)置一個(gè)環(huán)境變量。 $export $JAFKA_HOME=/opt/apps/jafka-1.0 以下假設(shè)所有操作目錄都在$JAFKA_HOME下。

          Step 2: 啟動(dòng)服務(wù)端

          這里啟動(dòng)一個(gè)單進(jìn)程的服務(wù)端,使用默認(rèn)的配置啟動(dòng)即可。由于一些路徑使用了相對(duì)路徑,因此需要在jafka的主目錄下運(yùn)行。

          $bash bin/server-single.sh config/server-single.properties 

          默認(rèn)情況下,無需任何配置即可運(yùn)行服務(wù)端。這時(shí)服務(wù)端會(huì)將9092端口綁定到所有網(wǎng)卡上。

          Step 3: 發(fā)送消息

          使用自帶的小命令行就可以發(fā)送簡單的文本消息。

          $bin/producer-console.sh --broker-list 0:localhost:9092 --topic demo 
          > Welcome to jafka
          > 中文中國

          producer-console.sh有一些參數(shù),這可以通過執(zhí)行下面的命令得到。 $bin/producer-console.sh

          發(fā)送消息只需要在提示符號(hào)'>'輸入文本即可,沒有出錯(cuò)意味著發(fā)送成功,直接回車或者輸入CTRL+C退出程序。

          Step 4: 啟動(dòng)消費(fèi)者

          現(xiàn)在是時(shí)候消費(fèi)剛才發(fā)送的消息。

          同樣Jafka自帶一個(gè)小程序能夠消費(fèi)簡單的文本消息。

          $bin/simple-consumer-console.sh --topic demo --server jafka://localhost:9092 
          [1] 26: Welcome to jafka
          [2] 48: 中文中國

          連接上服務(wù)端后,立即就看到有消息消費(fèi)了。默認(rèn)情況下simple-consumer-console.sh輸出消息的序號(hào)(實(shí)際上不存在)以及消息的下一個(gè)偏移量(offset)。

          解壓縮后只需要執(zhí)行上面三條命令就可以完成簡單的消息發(fā)送和接受演示。這就是一個(gè)簡單的消息系統(tǒng)。

          Step 5: 手動(dòng)編碼

          我們希望利用提供的API手動(dòng)編碼能夠發(fā)送和接受一些消息。

          消息發(fā)送者

          首先寫一個(gè)簡單的消息發(fā)送者。

          public static void main(String[] args) throws Exception {
              Properties props 
          = new Properties();
              props.put(
          "broker.list""0:127.0.0.1:9092");
              props.put(
          "serializer.class", StringEncoder.class.getName());
              
          //
              ProducerConfig config = new ProducerConfig(props);
              Producer
          <String, String> producer = new Producer<String, String>(config);
              
          //
              StringProducerData data = new StringProducerData("demo");
              
          for(int i=0;i<1000;i++) {
                  data.add(
          "Hello world #"+i);
              }
              
          //
              try {
                  
          long start = System.currentTimeMillis();
                  
          for (int i = 0; i < 100; i++) {
                      producer.send(data);
                  }
                  
          long cost = System.currentTimeMillis() - start;
                  System.out.println(
          "send 100000 message cost: "+cost+" ms");
              } 
          finally {
                  producer.close();
              }
          }

           

          看起來有點(diǎn)復(fù)雜,我們簡單分解下。

          配置參數(shù)

          首先需要配置服務(wù)端的地址。一個(gè)jfaka服務(wù)端地址格式如下:

          brokerId:host:port 
          • brokerId 用于標(biāo)識(shí)服務(wù)進(jìn)程,這在一個(gè)集群里面是全局唯一的
          • host/port 用戶描述服務(wù)監(jiān)聽的ip地址和端口,默認(rèn)情況下會(huì)在所有網(wǎng)卡的9092端口監(jiān)聽數(shù)據(jù)。

          配置完服務(wù)端信息后,我們需要提供一個(gè)消息編碼。

          消息編碼用于將任意消息類型編碼成字節(jié)數(shù)組,這些字節(jié)數(shù)組就是我們的消息體。 

          默認(rèn)情況下Jafka解析字節(jié)數(shù)組編碼,也就是原封不動(dòng)的發(fā)送出去。這里簡單替換下,使用字符串UTF-8編碼。

          構(gòu)造消息客戶端

          使用上面簡單的參數(shù)就可以構(gòu)造出來一個(gè)簡單的消息發(fā)送客戶端。

          消息發(fā)送客戶端(Producer)用于管理與服務(wù)端之間的連接,并將消息按照指定的編碼方式發(fā)送給服務(wù)端。 

          構(gòu)造消息

          用于使用字符串編碼,因此這里只能發(fā)送字符串的數(shù)據(jù)。每一個(gè)消息數(shù)據(jù)包都可以帶有多條消息,只需要滿足一個(gè)消息數(shù)據(jù)包的大小不超過默認(rèn)的1M即可。比如下面就構(gòu)造發(fā)往主題為demo的100條消息的數(shù)據(jù)包:

          StringProducerData data = new StringProducerData("demo");
          for(int i=0;i<1000;i++) {
          data.add("Hello world #"+i);
          }

          發(fā)送消息

          最后發(fā)送消息只需要調(diào)用producer.send()即可。上述例子中循環(huán)發(fā)送100次。

          下面是某次發(fā)送的結(jié)果:

          $bin/run-console.sh demo.client.StaticBrokerSender
          send 100000 message cost: 685 ms

          消息接受者

          接受消息的邏輯非常簡單,只需要配置服務(wù)端的地址,然后從偏移量0開始順序消費(fèi)消息即可。

          下面的邏輯是簡單的將接受的消息以UTF-8的字符串展示。

          SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092);
          //
          long offset = 0;
          while (true) {
              FetchRequest request = new FetchRequest("test", 0, offset);
              for (MessageAndOffset msg : consumer.fetch(request)) {
                  System.out.println(Utils.toString(msg.message.payload(), "UTF-8"));
                  offset = msg.offset;
              }
          }


          整合ZooKeeper

          Jafka 使用zookeeper進(jìn)行自動(dòng)broker尋址以及消費(fèi)者負(fù)載均衡。

          (1)啟動(dòng)zookeeper服務(wù)

          測(cè)試時(shí)可以使用一個(gè)單進(jìn)程的zookeeper用于替換zookeeper集群。

          $bin/zookeeper-server.sh config/zookeeper.properties 

          (2)啟動(dòng)Jafka服務(wù)端

          $bin/server-single.sh config/server.properties 
          [2012-04-24 12:29:56,526] INFO Starting Jafka server (com.sohu.jafka.server.Server.java:68)
          [2012-04-24 12:29:56,532] INFO starting log cleaner every 60000 ms (com.sohu.jafka.log.LogManager.java:155)
          [2012-04-24 12:29:56,552] INFO connecting to zookeeper: 127.0.0.1:2181 (com.sohu.jafka.server.Zookeeper.java:80)
          [2012-04-24 12:29:56,568] INFO Starting ZkClient event thread. (com.github.zkclient.ZkEventThread.java:64)


          服務(wù)端啟動(dòng)后自動(dòng)向zookeeper注冊(cè)服務(wù)端的信息,例如ip地址、端口、已存在的消息等。

          (3)啟動(dòng)消息發(fā)送者

          $bin/producer-console.sh --zookeeper localhost:2181 --topic demo
          Enter you message and exit with empty string.
          > Jafka second day
          > Jafka use zookeeper to search brokers and consumers                                       

          和上面啟動(dòng)的消息發(fā)送者類似,只不過這里使用zookeeper配置自動(dòng)尋找服務(wù)端,而不是指定服務(wù)端地址。

          (4)啟動(dòng)消息接受者

          $bin/consumer-console.sh --zookeeper localhost:2181 --topic demo --from-beginning
          Jafka second day
          Jafka use zookeeper to search brokers and consumers

          這時(shí)候很快就看到剛才發(fā)送的消息了。

          由于使用zookeeper作為配置中心,因此可以啟動(dòng)更多的服務(wù)端、消息發(fā)送者、消息接受者。只需要保證都連接zookeeper,并且所有的服務(wù)端都有唯一的brokerId(位于server.properties中).

          (5)API使用

          上面是使用自帶的程序發(fā)送簡單的文本消息。這里利用API來進(jìn)行開發(fā)。

          發(fā)送消息

             public static void main(String[] args) throws Exception {
                  Properties props = new Properties();
                  props.put("zk.connect", "localhost:2181");
                  props.put("serializer.class", StringEncoder.class.getName());
                  //
                  ProducerConfig config = new ProducerConfig(props);
                  Producer<String, String> producer = new Producer<String, String>(config);
                  //
                  StringProducerData data = new StringProducerData("demo");
                  for(int i=0;i<100;i++) {
                      data.add("Hello world #"+i);
                  }
                  //
                  try {
                      long start = System.currentTimeMillis();
                      for (int i = 0; i < 100; i++) {
                          producer.send(data);
                      }
                      long cost = System.currentTimeMillis() - start;
                      System.out.println("send 10000 message cost: "+cost+" ms");
                  } finally {
                      producer.close();
                  }
              }


          和不使用zookeeper的消息發(fā)送者對(duì)比,只需要將服務(wù)端配置信息替換成zookeeper連接地址即可。其它完全一致。

          接收消息

          接受消息看起來稍微有點(diǎn)復(fù)雜,簡單來說是如下幾步:

          • 配置zookeeper以及客戶端groupid
          • 與服務(wù)端的連接
          • 創(chuàng)建消息流
          • 啟動(dòng)線程池消費(fèi)消息


          public static void main(String[] args) throws Exception {

              Properties props 
          = new Properties();
              props.put(
          "zk.connect""localhost:2181");
              props.put(
          "groupid""test_group");
              
          //
              ConsumerConfig consumerConfig = new ConsumerConfig(props);
              ConsumerConnector connector 
          = Consumer.create(consumerConfig);
              
          //
              Map<String, List<MessageStream<String>>> topicMessageStreams = connector.createMessageStreams(ImmutableMap.of("demo"2), new StringDecoder());
              List
          <MessageStream<String>> streams = topicMessageStreams.get("demo");
              
          //
              ExecutorService executor = Executors.newFixedThreadPool(2);
              
          final AtomicInteger count = new AtomicInteger();
              
          for (final MessageStream<String> stream : streams) {
                  executor.submit(
          new Runnable() {

                      
          public void run() {
                          
          for (String message : stream) {
                              System.out.println(count.incrementAndGet() 
          + " => " + message);
                          }
                      }
                  });
              }
              
          //
              executor.awaitTermination(1, TimeUnit.HOURS);
          所有消息的消費(fèi)方式幾乎都相同,只是消費(fèi)的topic名稱不同而已。

           


          是不是很簡單,動(dòng)手試試吧

          https://github.com/adyliu/jafka/wiki/quickstart.zh_CN


          ©2009-2014 IMXYLZ |求賢若渴
          posted on 2012-05-11 18:48 imxylz 閱讀(9771) 評(píng)論(4)  編輯  收藏 所屬分類: Jafka

          評(píng)論

          # re: 分布式消息系統(tǒng)jafka快速起步 2012-05-17 15:50 樂百事
          希望提交至maven中央倉庫  回復(fù)  更多評(píng)論
            

          # re: 分布式消息系統(tǒng)jafka快速起步 2012-05-17 18:51 imxylz
          @樂百事
          已經(jīng)提交到Maven中央倉庫  回復(fù)  更多評(píng)論
            

          # re: 分布式消息系統(tǒng)jafka快速起步 2012-05-18 15:26 樂百事
          中央倉庫怎么搜不到??;

          pom dependency?  回復(fù)  更多評(píng)論
            

          # re: 分布式消息系統(tǒng)jafka快速起步 2012-05-18 15:55 imxylz
          http://search.maven.org/#search%7Cga%7C1%7Cjafka
          or
          http://search.maven.org/#search%7Cga%7C1%7Ccom.sohu
          or
          http://repo1.maven.org/maven2/com/sohu/jafka/jafka/1.0/  回復(fù)  更多評(píng)論
            


          ©2009-2014 IMXYLZ
          主站蜘蛛池模板: 通州市| 盘山县| 淳化县| 东山县| 铜鼓县| 榆中县| 河北区| 盘锦市| 珠海市| 名山县| 珲春市| 内乡县| 公安县| 应城市| 光山县| 从江县| 岳阳县| 滨海县| 额济纳旗| 游戏| 黔南| 白城市| 清苑县| 安阳县| 静安区| 河曲县| 深水埗区| 长武县| 西丰县| 高青县| 中宁县| 尤溪县| 新河县| 闽清县| 水城县| 新昌县| 海城市| 舞阳县| 安达市| 卫辉市| 沭阳县|