paulwong

          linkedin高吞吐量分布式消息系統kafka使用手記

          kafka是一種高吞吐量的分布式發布訂閱消息系統,她有如下特性:

          通過O(1)的磁盤數據結構提供消息的持久化,這種結構對于即使數以TB的消息存儲也能夠保持長時間的穩定性能。
          高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數十萬的消息。
          支持通過kafka服務器和消費機集群來分區消息。
          支持Hadoop并行數據加載。

          設計側重高吞吐量,用于好友動態,相關性統計,排行統計,訪問頻率控制,批處理等系統。大部分的消息中間件能夠處理實時性要求高的消息/數據,但是對于隊列中大量未處理的消息/數據在持久性方面比較弱。

          kakfa的consumer使用拉的方式工作。


          安裝kafka
          下載:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

          > tar xzf kafka-.tgz
          > cd kafka-
          > ./sbt update
          > ./sbt package
          啟動zkserver:
          bin/zookeeper-server-start.sh config/zookeeper.properties
          啟動server:
          bin/kafka-server-start.sh config/server.properties
          就是這么簡單。


          使用kafka
          import java.util.Arrays;  
          import java.util.List;  
          import java.util.Properties;  
          import kafka.javaapi.producer.SyncProducer;  
          import kafka.javaapi.message.ByteBufferMessageSet;  
          import kafka.message.Message;  
          import kafka.producer.SyncProducerConfig;  
            
            
            
          Properties props = new Properties();  
          props.put(“zk.connect”, “127.0.0.1:2181”);  
          props.put("serializer.class", "kafka.serializer.StringEncoder");  
          ProducerConfig config = new ProducerConfig(props);  
          Producer<String, String> producer = new Producer<String, String>(config);  
            
          Send a single message  
            
          // The message is sent to a randomly selected partition registered in ZK  
          ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message");  
          producer.send(data);      
            
          producer.close();  


          這樣就是一個標準的producer。

          consumer的代碼
          // specify some consumer properties  
          Properties props = new Properties();  
          props.put("zk.connect", "localhost:2181");  
          props.put("zk.connectiontimeout.ms", "1000000");  
          props.put("groupid", "test_group");  
            
          // Create the connection to the cluster  
          ConsumerConfig consumerConfig = new ConsumerConfig(props);  
          ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
            
          // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  
          Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =   
              consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));  
          List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");  
            
          // create list of 4 threads to consume from each of the partitions   
          ExecutorService executor = Executors.newFixedThreadPool(4);  
            
          // consume the messages in the threads  
          for(final KafkaMessageStream<Message> stream: streams) {  
            executor.submit(new Runnable() {  
              public void run() {  
                for(Message message: stream) {  
                  // process message  
                }   
              }  
            });  
          }  





          posted on 2013-09-08 17:32 paulwong 閱讀(571) 評論(0)  編輯  收藏 所屬分類: LOG ANALYST BIG DATA SYSTEM

          主站蜘蛛池模板: 三台县| 沾益县| 潢川县| 宜春市| 青海省| 盐亭县| 台中县| 靖江市| 湖南省| 上虞市| 湖北省| 新邵县| 增城市| 渑池县| 巴青县| 隆昌县| 博罗县| 洪湖市| 安泽县| 广平县| 佛坪县| 富源县| 镇平县| 焉耆| 商丘市| 霍州市| 米脂县| 泰州市| 高台县| 湖北省| 布尔津县| 湄潭县| 五常市| 潜江市| 贵州省| 香港 | 延津县| 什邡市| 榆树市| 西平县| 邛崃市|