對于一堆時刻在增長的數(shù)據(jù),如果要統(tǒng)計,可以采取什么方法呢?
- 等數(shù)據(jù)增長到一定程度的時候,跑一個統(tǒng)計程序進(jìn)行統(tǒng)計。適用于實時性要求不高的場景。
如將數(shù)據(jù)導(dǎo)到HDFS,再運行一個MAP REDUCE JOB。
- 如果實時性要求高的,上面的方法就不行了。因此就帶來第二種方法。
在數(shù)據(jù)每次增長一筆的時候,就進(jìn)行統(tǒng)計JOB,結(jié)果放到DB或搜索引擎的INDEX中。
STORM就是完成這種工作的。
HADOOP與STORM比較
- 數(shù)據(jù)來源:HADOOP是HDFS上某個文件夾下的可能是成TB的數(shù)據(jù),STORM是實時新增的某一筆數(shù)據(jù)
- 處理過程:HADOOP是分MAP階段到REDUCE階段,STORM是由用戶定義處理流程,
流程中可以包含多個步驟,每個步驟可以是數(shù)據(jù)源(SPOUT)或處理邏輯(BOLT) - 是否結(jié)束:HADOOP最后是要結(jié)束的,STORM是沒有結(jié)束狀態(tài),到最后一步時,就停在那,直到有新
數(shù)據(jù)進(jìn)入時再從頭開始 - 處理速度:HADOOP是以處理HDFS上大量數(shù)據(jù)為目的,速度慢,STORM是只要處理新增的某一筆數(shù)據(jù)即可
可以做到很快。 - 適用場景:HADOOP是在要處理一批數(shù)據(jù)時用的,不講究時效性,要處理就提交一個JOB,STORM是要處理
某一新增數(shù)據(jù)時用的,要講時效性
- 與MQ對比:HADOOP沒有對比性,STORM可以看作是有N個步驟,每個步驟處理完就向下一個MQ發(fā)送消息,
監(jiān)聽這個MQ的消費者繼續(xù)處理
首先要明白Storm和Hadoop的應(yīng)用領(lǐng)域,注意加粗、標(biāo)紅的關(guān)鍵字。
Hadoop是基于Map/Reduce模型的,處理海量數(shù)據(jù)的離線分析工具。
Storm是分布式的、實時數(shù)據(jù)流分析工具,數(shù)據(jù)是源源不斷產(chǎn)生的,例如Twitter的Timeline。
再回到你說的速度問題,只能說Storm更適用于實時數(shù)據(jù)流,Map/Reduce模型在實時領(lǐng)域很難有所發(fā)揮,不能簡單粗暴的說誰快誰慢。
這里的快主要是指的時延。
storm的網(wǎng)絡(luò)直傳、內(nèi)存計算,其時延必然比hadoop的通過hdfs傳輸?shù)偷枚啵划?dāng)計算模型比較適合流式時,storm的流式處理,省去了批處理的收集數(shù)據(jù)的時間;因為storm是服務(wù)型的作業(yè),也省去了作業(yè)調(diào)度的時延。所以從時延上來看,storm要快于hadoop。
說一個典型的場景,幾千個日志生產(chǎn)方產(chǎn)生日志文件,需要進(jìn)行一些ETL操作存入一個數(shù)據(jù)庫。
假設(shè)利用hadoop,則需要先存入hdfs,按每一分鐘切一個文件的粒度來算(這個粒度已經(jīng)極端的細(xì)了,再小的話hdfs上會一堆小文件),hadoop開始計算時,1分鐘已經(jīng)過去了,然后再開始調(diào)度任務(wù)又花了一分鐘,然后作業(yè)運行起來,假設(shè)機器特別多,幾鈔鐘就算完了,然后寫數(shù)據(jù)庫假設(shè)也花了很少的時間,這樣,從數(shù)據(jù)產(chǎn)生到最后可以使用已經(jīng)過去了至少兩分多鐘。
而流式計算則是數(shù)據(jù)產(chǎn)生時,則有一個程序去一直監(jiān)控日志的產(chǎn)生,產(chǎn)生一行就通過一個傳輸系統(tǒng)發(fā)給流式計算系統(tǒng),然后流式計算系統(tǒng)直接處理,處理完之后直接寫入數(shù)據(jù)庫,每條數(shù)據(jù)從產(chǎn)生到寫入數(shù)據(jù)庫,在資源充足時可以在毫秒級別完成。
當(dāng)然,跑一個大文件的wordcount,本來就是一個批處理計算的模型,你非要把它放到storm上進(jìn)行流式的處理,然后又非要讓等所有已有數(shù)據(jù)處理完才讓storm輸出結(jié)果,這時候,你再把它和hadoop比較快慢,這時,其實比較的不是時延,而是比較的吞吐了。
Hadoop M/R基于HDFS,需要切分輸入數(shù)據(jù)、產(chǎn)生中間數(shù)據(jù)文件、排序、數(shù)據(jù)壓縮、多份復(fù)制等,效率較低。
Storm 基于ZeroMQ這個高性能的消息通訊庫,不持久化數(shù)據(jù)。
kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),她有如下特性:
通過O(1)的磁盤數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對于即使數(shù)以TB的消息存儲也能夠保持長時間的穩(wěn)定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數(shù)十萬的消息。
支持通過kafka服務(wù)器和消費機集群來分區(qū)消息。
支持Hadoop并行數(shù)據(jù)加載。
設(shè)計側(cè)重高吞吐量,用于好友動態(tài),相關(guān)性統(tǒng)計,排行統(tǒng)計,訪問頻率控制,批處理等系統(tǒng)。大部分的消息中間件能夠處理實時性要求高的消息/數(shù)據(jù),但是對于隊列中大量未處理的消息/數(shù)據(jù)在持久性方面比較弱。
kakfa的consumer使用拉的方式工作。
安裝kafka下載:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
啟動zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動server:
bin/kafka-server-start.sh config/server.properties
就是這么簡單。
使用kafka
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.SyncProducer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.producer.SyncProducerConfig;
Properties props =
new Properties();
props.put(“zk.connect”, “127.0.0.1:2181”);
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config =
new ProducerConfig(props);
Producer<String, String> producer =
new Producer<String, String>(config);
Send a single message
// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data =
new ProducerData<String, String>("test-topic", "test-message");
producer.send(data);
producer.close();
這樣就是一個標(biāo)準(zhǔn)的producer。
consumer的代碼
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for(final KafkaMessageStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
for(Message message: stream) {
// process message
}
}
});
}
日志抓取端:
apache kafka在數(shù)據(jù)處理中特別是日志和消息的處理上會有很多出色的表現(xiàn),這里寫個索引,關(guān)于kafka的文章暫時就更新到這里,最近利用空閑時間在對kafka做一些功能性增強,并java化,雖然現(xiàn)在已經(jīng)有很多這樣的版本,但是根據(jù)實際需求來改變才是最適合的。
首先當(dāng)然推薦的是kafka的官網(wǎng) http://kafka.apache.org/
在官網(wǎng)最值得參考的文章就是kafka design:http://kafka.apache.org/design.html,我的文章也基本都是參照這里的說明,大家要特別重視這篇文章,里面有好多理念都特別好,推薦多讀幾遍。
在OSC的翻譯頻道有kafka design全中文的翻譯,翻得挺好的,推薦一下:http://www.oschina.net/translate/kafka-design
kafka的wiki是很不錯的學(xué)習(xí)文檔:https://cwiki.apache.org/confluence/display/KAFKA/Index
——————————————————————————————————
接下來就是我寫的一系列文章,文章都是循序漸進(jìn)的方式帶你了解kafka:
關(guān)于kafka的基本知識,分布式的基礎(chǔ):《分布式消息系統(tǒng)Kafka初步》
kafka的分布式搭建,quick start:《kafka分布式環(huán)境搭建》
關(guān)于kafka的實現(xiàn)細(xì)節(jié),這主要就是講design的部分:《細(xì)節(jié)上》、《細(xì)節(jié)下》
關(guān)于kafka開發(fā)環(huán)境,scala環(huán)境的搭建:《開發(fā)環(huán)境搭建》
數(shù)據(jù)生產(chǎn)者,producer的用法:《producer的用法》、《producer使用注意》
數(shù)據(jù)消費者,consumer的用法:《consumer的用法》
還有些零碎的,關(guān)于通信段的源碼解讀:《net包源碼解讀》、《broker配置》
——————————————————————————————————
擴展的閱讀還有下面這些:
我的好友寫的關(guān)于kafka和jafka的相關(guān)博客,特別好,我有很多問題也都找他解決的,大神一般的存在:http://rockybean.github.com/ @rockybean
kafka的java化版本jafka:https://github.com/adyliu/jafka
淘寶的metaQ:https://github.com/killme2008/Metamorphosis
我最近在寫的inforQ,剛開始寫,我也純粹是為了讀下源碼,不定期更新哈:https://github.com/ielts0909/inforq
后面一階段可能更新點兒關(guān)于cas的東西吧,具體也沒想好,最近一直出差,寫代碼的時間都很少
--------------------------------------------------------------------------------
0.8版本的相關(guān)更新如下:
0.8更新內(nèi)容介紹:《kafka0.8版本的一些更新》
如果簡單地比較Redis與Memcached的區(qū)別,大多數(shù)都會得到以下觀點:
1 Redis不僅僅支持簡單的k/v類型的數(shù)據(jù),同時還提供list,set,hash等數(shù)據(jù)結(jié)構(gòu)的存儲。
2 Redis支持?jǐn)?shù)據(jù)的備份,即master-slave模式的數(shù)據(jù)備份。
3 Redis支持?jǐn)?shù)據(jù)的持久化,可以將內(nèi)存中的數(shù)據(jù)保持在磁盤中,重啟的時候可以再次加載進(jìn)行使用。
在Redis中,并不是所有的數(shù)據(jù)都一直存儲在內(nèi)存中的。這是和Memcached相比一個最大的區(qū)別(我個人是這么認(rèn)為的)。
Redis只會緩存所有的key的信息,如果Redis發(fā)現(xiàn)內(nèi)存的使用量超過了某一個閥值,將觸發(fā)swap的操作,Redis根據(jù)“swappability = age*log(size_in_memory)”計算出哪些key對應(yīng)的value需要swap到磁盤。然后再將這些key對應(yīng)的value持久化到磁盤中,同時在內(nèi)存中清除。這種特性使得Redis可以保持超過其機器本身內(nèi)存大小的數(shù)據(jù)。當(dāng)然,機器本身的內(nèi)存必須要能夠保持所有的key,畢竟這些數(shù)據(jù)是不會進(jìn)行swap操作的。
同時由于Redis將內(nèi)存中的數(shù)據(jù)swap到磁盤中的時候,提供服務(wù)的主線程和進(jìn)行swap操作的子線程會共享這部分內(nèi)存,所以如果更新需要swap的數(shù)據(jù),Redis將阻塞這個操作,直到子線程完成swap操作后才可以進(jìn)行修改。
可以參考使用Redis特有內(nèi)存模型前后的情況對比:
VM off: 300k keys, 4096 bytes values: 1.3G used
VM on: 300k keys, 4096 bytes values: 73M used
VM off: 1 million keys, 256 bytes values: 430.12M used
VM on: 1 million keys, 256 bytes values: 160.09M used
VM on: 1 million keys, values as large as you want, still: 160.09M used
當(dāng)從Redis中讀取數(shù)據(jù)的時候,如果讀取的key對應(yīng)的value不在內(nèi)存中,那么Redis就需要從swap文件中加載相應(yīng)數(shù)據(jù),然后再返回給請求方。這里就存在一個I/O線程池的問題。在默認(rèn)的情況下,Redis會出現(xiàn)阻塞,即完成所有的swap文件加載后才會相應(yīng)。這種策略在客戶端的數(shù)量較小,進(jìn)行批量操作的時候比較合適。但是如果將Redis應(yīng)用在一個大型的網(wǎng)站應(yīng)用程序中,這顯然是無法滿足大并發(fā)的情況的。所以Redis運行我們設(shè)置I/O線程池的大小,對需要從swap文件中加載相應(yīng)數(shù)據(jù)的讀取請求進(jìn)行并發(fā)操作,減少阻塞的時間。
redis、memcache、mongoDB 對比
從以下幾個維度,對redis、memcache、mongoDB 做了對比,歡迎拍磚
1、性能
都比較高,性能對我們來說應(yīng)該都不是瓶頸
總體來講,TPS方面redis和memcache差不多,要大于mongodb
2、操作的便利性
memcache數(shù)據(jù)結(jié)構(gòu)單一
redis豐富一些,數(shù)據(jù)操作方面,redis更好一些,較少的網(wǎng)絡(luò)IO次數(shù)
mongodb支持豐富的數(shù)據(jù)表達(dá),索引,最類似關(guān)系型數(shù)據(jù)庫,支持的查詢語言非常豐富
3、內(nèi)存空間的大小和數(shù)據(jù)量的大小
redis在2.0版本后增加了自己的VM特性,突破物理內(nèi)存的限制;可以對key value設(shè)置過期時間(類似memcache)
memcache可以修改最大可用內(nèi)存,采用LRU算法
mongoDB適合大數(shù)據(jù)量的存儲,依賴操作系統(tǒng)VM做內(nèi)存管理,吃內(nèi)存也比較厲害,服務(wù)不要和別的服務(wù)在一起
4、可用性(單點問題)
對于單點問題,
redis,依賴客戶端來實現(xiàn)分布式讀寫;主從復(fù)制時,每次從節(jié)點重新連接主節(jié)點都要依賴整個快照,無增量復(fù)制,因性能和效率問題,
所以單點問題比較復(fù)雜;不支持自動sharding,需要依賴程序設(shè)定一致hash 機制。
一種替代方案是,不用redis本身的復(fù)制機制,采用自己做主動復(fù)制(多份存儲),或者改成增量復(fù)制的方式(需要自己實現(xiàn)),一致性問題和性能的權(quán)衡
Memcache本身沒有數(shù)據(jù)冗余機制,也沒必要;對于故障預(yù)防,采用依賴成熟的hash或者環(huán)狀的算法,解決單點故障引起的抖動問題。
mongoDB支持master-slave,replicaset(內(nèi)部采用paxos選舉算法,自動故障恢復(fù)),auto sharding機制,對客戶端屏蔽了故障轉(zhuǎn)移和切分機制。
5、可靠性(持久化)
對于數(shù)據(jù)持久化和數(shù)據(jù)恢復(fù),
redis支持(快照、AOF):依賴快照進(jìn)行持久化,aof增強了可靠性的同時,對性能有所影響
memcache不支持,通常用在做緩存,提升性能;
MongoDB從1.8版本開始采用binlog方式支持持久化的可靠性
6、數(shù)據(jù)一致性(事務(wù)支持)
Memcache 在并發(fā)場景下,用cas保證一致性
redis事務(wù)支持比較弱,只能保證事務(wù)中的每個操作連續(xù)執(zhí)行
mongoDB不支持事務(wù)
7、數(shù)據(jù)分析
mongoDB內(nèi)置了數(shù)據(jù)分析的功能(mapreduce),其他不支持
8、應(yīng)用場景
redis:數(shù)據(jù)量較小的更性能操作和運算上
memcache:用于在動態(tài)系統(tǒng)中減少數(shù)據(jù)庫負(fù)載,提升性能;做緩存,提高性能(適合讀多寫少,對于數(shù)據(jù)量比較大,可以采用sharding)
MongoDB:主要解決海量數(shù)據(jù)的訪問效率問題