莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

              Kafka這個linkedin開源的MQ,我在過去的blog簡單介紹過。最近3周來,我的工作就是做它的一個Java移植版本,kafka是用scala寫的,基于維護和定制的角度,這個拷貝的版本還是用Java。說拷貝,也不盡然,原理相同,但實現完全換過,從數據結構到通訊框架、通訊協議、程序組織,乃至一些重要功能點上都做了改進和更新。我將這個Java版本取名為metamorphosis,也就是卡夫卡的代表作《變形記》的英文名。

              在原版本上,目前做了如下改進:
          1、協議替換為文本協議,整個協議類似memcached,文本協議的優點自不必說。通訊框架也是采用內部使用的通訊框架,減少工作量。

          2、存儲結構上也采用自定義結構,更簡潔緊湊。

          3、kafka原來只支持consumer和broker之間的服務查找和負載均衡,meta加入了producer和broker之間的服務查找和負載均衡。

          4、Consumer API沒有采用kafka的stream方式,而是同時實現同步獲取和異步訂閱兩種方式,更接近JMS和Notify。

          5、改進了服務器端文件recover的性能,采用并發多線程recover的方式(可選)。

          6、添加了實時統計功能和協議,類似memcached的stats協議,響應透明號召。

          7、客戶端的連接復用。
             
              以后要做的事情,可能包括:
          1、實現類似Mysql的master/slave方案,可能還要分為同步和異步兩種模式。

          2、分區擴展時候的數據自動遷移功能,做到無痛水平擴展。

          3、高可用方案的另一個實現。

          4、嵌入Http server做web管理。
            
              工作在本周初步告一段落,接下來是要做集成測試和壓測等,我在兩臺8核16G的機器上分別部署服務器和客戶端(訂閱者發布者同在一臺),做的一個簡單壓測數據如下:并發100個線程發送5000萬消息并同時消費,1K大小的消息TPS可以達到3.8萬,4K大小的消息TPS可以達到1.8萬,服務器load都維持在一個較低的水平。從這個數據來看,超過我一開始的預期。后續可能做下kakfa的測試對比下。


          posted @ 2011-05-07 10:46 dennis 閱讀(5532) | 評論 (6)編輯 收藏

              無論是消息系統,還是配置管理中心,甚至存儲系統,你都要面臨這樣一個選擇,push模型 or pull模型?是服務端主動給客戶端推送數據,還是客戶端去服務器拉數據,一張圖表對比如下:
           
          push模型 pull模型
          描述 服務端主動發送數據給客戶端 客戶端主動從服務端拉取數據,通常客戶端會定時拉取
          實時性 較好,收到數據后可立即發送給客戶端 一般,取決于pull的間隔時間
          服務端狀態 需要保存push狀態,哪些客戶端已經發送成功,哪些發送失敗 服務端無狀態
           客戶端狀態  無需額外保存狀態 需保存當前拉取的信息的狀態,以便在故障或者重啟的時候恢復
          狀態保存 集中式,集中在服務端 分布式,分散在各個客戶端
          負載均衡 服務端統一處理和控制 客戶端之間做分配,需要協調機制,如使用zookeeper
          其他

          服務端需要做流量控制,無法最大化客戶端的處理能力。

          其次,在客戶端故障情況下,無效的push對服務端有一定負載。

          客戶端的請求可能很多無效或者沒有數據可供傳輸,浪費帶寬和服務器處理能力
          缺點方案 服務器端的狀態存儲是個難點,可以將這些狀態轉移到DB或者key-value存儲,來減輕server壓力。

          針對實時性的問題,可以將push加入進來,push小數據的通知信息,讓客戶端再來主動pull。

          針對無效請求的問題,可以設置逐漸延長間隔時間的策略,以及合理設計協議盡量縮小請求數據包來節省帶寬。



          在面對大量甚至海量客戶端的時候,使用push模型,保存大量的狀態信息是個沉重的負擔,加上復制N份數據分發的壓力,也會使得實時性這唯一的優點也被放小。使用pull模型,通過將客戶端狀態保存在客戶端,大大減輕了服務器端壓力,通過客戶端自身做流量控制也更容易,更能發揮客戶端的處理能力,但是需要面對如何在這些客戶端之間做協調的難題。

          posted @ 2011-04-30 01:06 dennis 閱讀(4545) | 評論 (1)編輯 收藏



              HandlerSocket是一個mysql插件,可以將mysql作為NoSQL來使用,具體可以看我過去寫的這篇Bloghs4j是HandlerSocket的一個java客戶端,自認為它比日本人寫的那個客戶端更實用和易用一些。寫完好久,經過不少朋友使用和測試,現在正式發一個0.1版本,并已同步到maven中心倉庫。

              項目主頁:http://code.google.com/p/hs4j/
              項目描述:hs4j is a practical java client for HandlerSocket,it is nio based and turned to get better performance.
              使用文檔:http://code.google.com/p/hs4j/w/list
              下載地址:http://code.google.com/p/hs4j/downloads/list
              源碼倉庫:https://github.com/killme2008/hs4j

               如果你使用maven2,可以直接引用:
          <dependency>
            
          <groupId>com.googlecode.hs4j</groupId>
            
          <artifactId>hs4j</artifactId>
            
          <version>0.1</version>
          </dependency>

               有疑問和bug請聯系我。

          posted @ 2011-03-29 06:55 dennis 閱讀(3623) | 評論 (3)編輯 收藏

               Xmemcached是一個開源的java memcached client,具有高性能、更易用、功能完善等優點,距離上次發布1.3.1已經超過兩個月,現在正式發布1.3.2這個新版本,主要的改進如下:


          1、Bug修復,從1.3.1版本以來發現的bug并修復,包括:

          issue 112:: 新引入的failure模式在啟動的時候,如果memcached故障,運行不符合預期的bug.

          issue 113: 新增加一個delete方法,可以設置操作超時

          public boolean delete(final String key, long opTimeout)
                      
          throws TimeoutException, InterruptedException, MemcachedException;

          2、性能調優,存儲操作(set/add/replace/prepend/append/cas)的性能提升5%。

          3、修復pom.xml,使得xmemcached可以在其他機器上編譯。

          4、使用github作為源碼倉庫,版本管理使用git替換svn,源碼轉移到

                https://github.com/killme2008/xmemcached

          新版本下載地址:

              http://code.google.com/p/xmemcached/downloads/list

          使用maven可以直接引用: 

          <dependency>
                <groupId>com.googlecode.xmemcached</groupId>
                
          <artifactId>xmemcached</artifactId>
                
          <version>1.3.2</version>
           
          </dependency>

          項目文檔:

          http://code.google.com/p/xmemcached/w/list

          posted @ 2011-03-27 14:06 dennis 閱讀(3013) | 評論 (1)編輯 收藏


              在國內,Clojure語言的用戶估計是小眾中的小眾,沒有多少人聽說,也沒有多少人使用,資料也大多數是英文的,討論也只能上國外論壇。因此,我想建立一個CN-Clojure的google group,供大家交流和學習clojure語言。群組地址(需要翻墻):http://groups.google.com/group/cn-clojure

             現在沒人,就我一個。我也會在群組里放些學習資料,歡迎任何對clojure感興趣的朋友加入。

          posted @ 2011-01-28 19:39 dennis 閱讀(3884) | 評論 (5)編輯 收藏

               昨天晚上用clojure搞了個scheme解釋器,基本上是sicp里的解釋器的clojure翻譯版本,可能唯一值的一提的是對transient集合的使用,實現副作用的set!。總共代碼包含注釋才366行,支持的feature包括

          Feature Supported Comment
          define yes
          lambda yes
          variable lookup yes
          primitive procedure evaluation yes
          compound procedure evaluation yes no tail recursion yet
          if yes
          cond yes
          let yes

          let* yes
          no named let* yet
          letrec no
          begin yes

          set! yes

          quote yes
          quasiquote no
          unquote no
          delay no
          define-syntax no

                 支持的primitive procedure包括常見的四則運算、car/cdr、list以及display、newline等。代碼放在了github上:https://github.com/killme2008/cscheme,有興趣的可以玩玩吧。

          posted @ 2011-01-24 10:42 dennis 閱讀(3805) | 評論 (0)編輯 收藏


              最近因為空閑時間有一些,所以去看了不少開源項目,大部分東西如果看過不記錄下來,其實還是相當于沒看,所以想想還是有必要摘要記錄一下。

              首先是去了解了zookeeper這個項目,基于paxos算法的分布式服務組件,同事對此有非常深入的研究和介紹,具體可以看我們的團隊Blog。令我感慨的是這么一個非常難以理解的算法,卻用一個簡單的樹狀目錄模型表達出來,并且在這個模型的基礎上衍生出種種應用:集群感知、分布式鎖、分布式隊列、分布式并發原語等等,具體可以看文檔給出的recipes。在實現這些應用的時候,突出強調的是避免網絡風暴,例如分布式鎖的實現,競爭創建子節點,節點序列號最小的獲取鎖,其他節點等待,但是等待在什么條件上是有講究的,如果所有節點都等待最小節點的刪除事件,那么當最小節點釋放鎖的時候,就需要廣播消息給所有其他等待的節點;換一個思路,如果每個等待節點只是等待比它序列號小的節點上,那么就可以避免這種廣播風暴,變成一個順序喚醒的過程。因此盡管有了zookeeper幫助實現分布式這些服務,但是要實現好仍然有一定難度,具體可以參考官方例子。我本來萌生了基于zookeeper實現一套封裝好的類似j.u.c的服務框架,后來在郵件列表發現已經有人搞了這么一個基礎類庫放在github上:https://github.com/openUtility/menagerie 。不過我沒有繼續深入了,有興趣的朋友可以瞧瞧。

              然后又去看了我們淘寶開源的TimeTunnel。TimeTunnel你可以理解成一個消息中間件,它整個設計跟我們的產品相當接近,但是兩者的目的完全不同,tt強調的是高吞吐量,而notify強調的則是可靠性。TT的通訊層直接采用Facebook的thrift,并且利用zookeeper做集群管理和路由。TT的代碼質量很好,有興趣可以拉出來看一下,并且對zookeeper的應用也是一個典型的案例。TT在高可用性上的方案也很有特色,所有的服務器節點形成一個環,兩兩相互主輔備份,一個節點掛了,后續節點仍然可以提供服務直到主節點回來,有點類似一致性哈希的概念。節點的主從關系和順序也是通過zookeeper保證。消息順序的實現是通過稱為router的路由到固定節點做傳輸,router默認是策略不是固定而是RR。TT的數據存儲優先放在內存,并設置了一個內存狀況監視的組件,當發現內存放不下的時候,swap到磁盤文件緩存,實現類似內存換頁的功能。正常情況數據都應該在內存,當然如果可靠級別要求高的話可以先存磁盤再傳輸。TT目前仍然還是比較適合傳輸日志這樣的文本增量數據,并且提供了TailFile這樣的python腳本幫你做這個事情,這個腳本可以通過checkpoint做斷點續傳。在學習這個項目的時候,發現文檔有很大問題,要么錯誤,要么遺漏,并且代碼也不是最新的,我估計開源出來外面的人用的還不太多,希望慢慢能搞的更好一些。

              跟TT類似,另一個追求高吞吐量的MQ是linkedin開源的kafka。Kafka就跟這個名字一樣,設計非常獨特。首先,kafka的開發者們認為不需要在內存里緩存什么數據,操作系統的文件緩存已經足夠完善和強大,只要你不搞隨機寫,順序讀寫的性能是非常高效的。kafka的數據只會順序append,數據的刪除策略是累積到一定程度或者超過一定時間再刪除。Kafka另一個獨特的地方是將消費者信息保存在客戶端而不是MQ服務器,這樣服務器就不用記錄消息的投遞過程,每個客戶端都自己知道自己下一次應該從什么地方什么位置讀取消息,消息的投遞過程也是采用客戶端主動pull的模型,這樣大大減輕了服務器的負擔。Kafka還強調減少數據的序列化和拷貝開銷,它會將一些消息組織成Message Set做批量存儲和發送,并且客戶端在pull數據的時候,盡量以zero-copy的方式傳輸,利用sendfile(對應java里的FileChannel.transferTo/transferFrom)這樣的高級IO函數來減少拷貝開銷。可見,kafka是一個精心設計,特定于某些應用的MQ系統,這種偏向特定領域的MQ系統我估計會越來越多,垂直化的產品策略值的考慮。

               在此期間,我還重新去看了activemq和hornetq的存儲實現,從實現上大家都大同小異,append log + data file的模式。Activemq采用異步隊列寫來提高吞吐量,而Hornetq干脆就直接利用JNI調用原生aio來實現高性能。在搜索Java的aio實現的時候,碰巧發現Mina的沙箱里有個aioj的實現,源碼在:https://svn.apache.org/repos/asf/mina/sandbox/mheath/aioj/ 。我測試了完全可用,也嘗試改造我們的磁盤存儲組件,可惜提升不多,估計不從整個設計上調整服務器,不大可能從aio上獲益。

               最近也重新看起了clojure的一些開源項目,clojure的開源資源在github上也非常豐富,有待挖掘,下次有機會再嘗試介紹一二。
            
             
             

          posted @ 2011-01-20 23:23 dennis 閱讀(7651) | 評論 (11)編輯 收藏

              寫著玩的,不使用任何網絡框架從頭構建的echo server,總共77行。
           1 ;;Author:dennis (killme2008@gmail.com)
           2 (ns webee.network
           3    (:import (java.nio.channels Selector SocketChannel ServerSocketChannel SelectionKey)
           4             (java.net InetSocketAddress)
           5             (java.nio ByteBuffer)
           6             (java.io IOException)))
           7 
           8 (declare reactor process-keys accept-channel read-channel)
           9 
          10 (defn bind [^InetSocketAddress addr fcol]
          11   (let [selector (Selector/open)
          12         ssc      (ServerSocketChannel/open)
          13         ag  (agent selector)]
          14     (do
          15       (.configureBlocking ssc false)
          16       (.. ssc (socket) (bind addr 1000))
          17       (.register ssc selector SelectionKey/OP_ACCEPT)
          18       (send-off ag reactor fcol)
          19       ag)))
          20 
          21 (defn- reactor [^Selector selector fcol]
          22   (let [sel (. selector select 1000)]
          23     (if (> sel 0)
          24       (let [sks (. selector selectedKeys)]
          25         (do 
          26           (dorun (map (partial process-keys selector fcol) sks))
          27           (.clear sks))))
          28     (recur selector fcol)))
          29   
          30 (defn- process-keys [^Selector selector ^SelectionKey fcol sk]
          31   (try
          32     (cond 
          33       (.isAcceptable sk) (accept-channel sk  selector fcol)
          34       (.isReadable sk) (read-channel sk selector fcol)    
          35     )
          36     (catch Throwable e (.printStackTrace e))))
          37 
          38 (defn- accept-channel [^SelectionKey sk ^Selector selector fcol]
          39    (let [^ServerSocketChannel ssc (. sk channel)
          40          ^SocketChannel sc (. ssc accept)
          41          created-fn (:created fcol)]
          42      (do 
          43        (.configureBlocking sc false
          44        (.register sc selector SelectionKey/OP_READ)
          45        (if created-fn
          46          (created-fn sc)))))
          47 
          48 (defn- close-channel [^SelectionKey sk ^SocketChannel sc fcol]
          49   (let [closed-fn (:closed fcol)]
          50     (do 
          51        (.close sc)
          52        (.cancel sk)
          53        (if closed-fn 
          54          (closed-fn sc)))))
          55      
          56 (defn-  read-channel [^SelectionKey sk ^Selector selector fcol]
          57    (let [^SocketChannel sc (. sk channel)
          58          ^ByteBuffer buf (ByteBuffer/allocate 4096)
          59          read-fn (:read fcol)]
          60      (try
          61        (let [n (.read sc buf)]
          62          (if (< n 0)
          63              (close-channel sk sc fcol)
          64              (do (.flip buf)
          65                  (if read-fn
          66                    (read-fn sc buf)))))
          67        (catch IOException e
          68          (close-channel sk sc fcol)))))
          69 
          70 ;;Bind a tcp server to localhost at port 8080,you can telnet it.
          71 (def server
          72   (bind 
          73     (new InetSocketAddress 8080)
          74     {:read #(.write %1 %2)
          75      :created #(println "Accepted from" (.. % (socket) (getRemoteSocketAddress)))
          76      :closed  #(println "Disconnected from" (.. % (socket) (getRemoteSocketAddress)))
          77      }))


          posted @ 2011-01-15 22:56 dennis 閱讀(1899) | 評論 (0)編輯 收藏

                Xmemcached是一個開源的memcached的Java客戶端,最近引入了一些關鍵特性,因此版本號直接從1.2.6.2升級到1.3.0。主要的更改如下:

          1、引入了failure模式,所謂failure模式是指在當一個memcached由于各種原因不可用的情況下,發往這個節點的請求將直接拋出異常,而非使用下一個可用的節點。具體可以看memached的這個文檔。默認不啟用,啟用failure模式很簡單:

          MemcachedClientBuilder builder=……
          //啟用failure模式。
          builder.setFailureMode(true);

          也可以采用spring配置。

          2、在啟用failure模式的情況下,允許為每個memcached設置一個備份節點,當主節點掛掉的情況下,會將請求轉交給備份節點,主節點恢復后又自動切換到主節點。請注意,要設置備份節點的前提是啟用failure模式。假設我們已經有兩個memcached節點:host1:port和host2:port,為host1:port設置一個備份節點host3:port可以實現為:
          MemcachedClientBuilder builder=new XmemcachedClientBuilder(AddrUtil.getAddressMap("host1:port,host3:port host2:port"))
          ……

          主備節點之間用逗號隔開,不同分組之間用空格隔開,完全兼容1.2。并且當備份節點連接意外斷開的情況下,xmemcached也會自動修復備份節點的連接并加入映射。

          關于failure模式和standby節點更多內容可以參考這篇blog.

          3、修正BUG和新功能,包括issue 104,issue 105,issue 107等。

          項目主頁 http://code.google.com/p/xmemcached/

          下載地址 http://code.google.com/p/xmemcached/downloads/list

          用戶指南 http://code.google.com/p/xmemcached/wiki/TableOfContents

               如果你使用maven構建,可以直接引用:

          <dependency>
          <groupId>com.googlecode.xmemcached</groupId>
          <artifactId>xmemcached</artifactId>
          <version>1.3.1</version>
          </dependency>


              更新:發布1.3.1了,如果你還在使用1.3.0,建議升級。1.3.0因為改變了memcached地址服務器順序,可能導致原有的緩存失效。


          posted @ 2011-01-04 20:10 dennis 閱讀(2933) | 評論 (0)編輯 收藏


              首先,還是利用下這個小工具,查看下我10年讀過的書,看過的電影




              讀書:讀的太少,也可以看到,技術方面的更少,如果要說推薦,我只會推薦《Programming Clojure》作為學習clojure的入門,并且推薦《構建高性能web站點》作為了解一個網站構建的方放面面的入門書。

              電影:今年進電影院的次數也寥寥無幾,主要還是重看了萊昂納多的作品,《盜夢空間》很驚艷,《鋼鐵俠2》很失望。

               去年的愿望:讀完《算法導論》——2/3,繼續深入Erlang,探索Erlang在工作中的實際應用——幾乎沒有,加強對其他系統的了解以及大型網站構建方面的學習——小小一些了解,希望能全家一起去旅游一次,希望能將老爸老媽接過來玩一段時間——沒有做到。
            
               工作:狀態并不好,還是嘗試努力去做了一些事情,包括參與一些分享,更多參與他人的代碼復查和設計審查等。抱怨、牢騷少了一些,相對淡定了。

               2011年:還是不談大的愿望,從以往的經驗來說,很難靠譜。也許有一個相對明晰的目標:提高自制力和計劃性。


          posted @ 2011-01-01 09:08 dennis 閱讀(2101) | 評論 (3)編輯 收藏

          僅列出標題
          共56頁: First 上一頁 3 4 5 6 7 8 9 10 11 下一頁 Last 
          主站蜘蛛池模板: 朝阳县| 怀仁县| 旌德县| 沙坪坝区| 闽侯县| 哈尔滨市| 怀来县| 河北区| 青海省| 名山县| 杨浦区| 西林县| 桦南县| 乳山市| 田东县| 江油市| 元朗区| 桐乡市| 汉寿县| 韩城市| 广河县| 吴旗县| 施秉县| 房产| 湛江市| 南丰县| 浠水县| 霍邱县| 镇沅| 壤塘县| 太保市| 海口市| 东乌| 大安市| 五华县| 岗巴县| 麻栗坡县| 台北市| 新河县| 尉氏县| 宣城市|