Author:放翁(文初)
Email:fangweng@taobao.com
Mblog:weibo.com/fangweng
Blog: http://blog.csdn.net/cenwenchu79/
局部設計
首先要說明的是,這部分內(nèi)容和第一篇不同,必須對照代碼看才會理解其中的含義,光看設計實現(xiàn)會比較難懂其中所說的細節(jié)點。代碼:https://github.com/cenwenchu/beatles
IComponent:
做系統(tǒng)就像搭積木一樣,這些組件最后都會拼裝起來,而積木往往由于內(nèi)部機制可定制化需要config,同時組合在一起的積木往往會有一個傳遞的Config(可以認為是靜態(tài)的Context)。
INode:
Node的設計比較簡單,自身是Runnable的單線程循環(huán)體,內(nèi)置一個單線程事件監(jiān)聽分
發(fā)器。Node的主線程主要負責該Node自身產(chǎn)生的事件處理(常規(guī)已知事件處理):Master就是維護任務列表狀態(tài),并根據(jù)任務執(zhí)行情況做一些Action,Slave就是重復的獲取,執(zhí)行任務,返回結(jié)果。而Node中的事件監(jiān)聽器主要用于外部消息驅(qū)動事件處理(偶發(fā)性事件處理),例如Master收到Slave的請求,外部要求導出載入中間結(jié)果等。這里會發(fā)現(xiàn)采用的是單線程阻塞檢查獲取事件:
1.多線程并發(fā)檢查事件,對于事件承載者(隊列)就要求做好并發(fā)控制,也就是最終在獲取事件的過程中依然是串行化,所以大部分事件處理框架對于事件檢查都采用單線程,簡單高效。
2.單線程可以用于檢查事件,但執(zhí)行事件會采用多線程或者主線程直接處理,取決于事件執(zhí)行速度和可靠性(外部依賴),如果事件可以快速執(zhí)行(無外部依賴,邏輯簡單),則采用檢查線程直接處理(NIO中對于連接建立和銷毀直接在主線程中處理掉,這里Master對于獲取任務事件的前半段采用直接處理),如果事件消耗時間較久,或者依賴與外部系統(tǒng)穩(wěn)定性和處理情況,則需要采用多線程異步處理(這里很多都是讓內(nèi)部組件來保證方法執(zhí)行的快速返回,例如JobExporter的所有方法等處理都是內(nèi)部線程異步完成,對外接口快速返回),如果必須要有結(jié)果回執(zhí),那么可以采用回調(diào)模式或者直接提交新事件(帶有上下文可以接著上次處理)到事件處理引擎。
3.另一種方式會選擇將事件分開放置來提高處理效率,例如TimeOut事件和普通外部激發(fā)事件,注意盡量避免通過輪詢對象狀態(tài)來判斷事件發(fā)生,除了Timeout必須這么做,其他盡量通過對對象狀態(tài)變更操作內(nèi)置事件產(chǎn)生器來創(chuàng)建真實事件,這樣事件處理者只需要阻塞等待事件即可,系統(tǒng)內(nèi)狀態(tài)規(guī)模增大對于事件處理時間復雜度還是O(1)。對于Timeout這類事件后續(xù)介紹SlaveConnector中有更詳細介紹,至此略過。那是否需要將不同業(yè)務處理放置在不同的隊列交由不同的單線程處理,取決于系統(tǒng)事件產(chǎn)生速度,就好比NIO中可以起多個Selector來處理,由于起多個單線程守護到隊列如果利用率不高,其實對于系統(tǒng)來說也是一種負擔,所以可以做成可配置的方式提供給外部(beatles中是沒有提供配置,就一個線程,因為本身并不是大并發(fā)的web前端系統(tǒng),接上千個slave的話消息量分布也不會非常密集,畢竟任務分析本身需要消耗時間)。
MasterNode中有兩個組件:JobManager和MasterConnector,一個負責上層業(yè)務處理,一個負責消息通信,在MasterNode運作的時候,兩者其實需要協(xié)調(diào)工作,例如MasterConnector可能會收到消息,需要提交給JobManager處理并獲得結(jié)果返回。為了實現(xiàn)內(nèi)部組件不會相互依賴(MasterNode內(nèi)部成為網(wǎng)狀結(jié)構(gòu)),采用MasterNode作為中間消息傳遞者,通過事件或回調(diào)方式相互驅(qū)動,同時利用上下文(將Channel作為Event的一部結(jié)構(gòu),用于后續(xù)消息返回)來傳遞一些環(huán)境信息。需要注意的是,這種解耦的做法勢必帶來性能的下降,因此可以和前面提到的事件處理為多線程還是單線程一樣,對于消息機制的依賴也不要盲從,按需使用,例如Connector通過事件提交給MasterNode,MasterNode接收事件后調(diào)用JobManager處理,處理后的結(jié)果也可以利用事件機制反向驅(qū)動Master去調(diào)用Connector,但也可以直接將MasterNode植入JobManager,反向利用代理模式來直接處理,這里關鍵看你是否需要釋放掉你當前的線程,讓任務異步去做,而當前線程可以回收去做更多的處理,帶來的是線程切換和事件驅(qū)動的消耗。不過總體上來說讓組件的宿主來完成交互,能夠減少模塊間依賴帶來的耦合性和復雜度。
FileJobExporter
這個類主要用于文件輸出,但在輸出部分的代碼中有lazymerge的部分,所謂的lazy merge指的是部分entry<key,value>的結(jié)果是依賴于處理后的部分結(jié)果而得到,例如成功率這個指標就是用成功數(shù)/總數(shù)。作為分析系統(tǒng)來說,如果成功數(shù)的<key,value>需要長期保存,總數(shù)的<key,value>需要長期保存,那是否需要在最終產(chǎn)出報表以前就將成功率的<key,value>計算并保存在內(nèi)存中呢?其實大可不必,不僅浪費了cpu資源,也浪費了大量的內(nèi)存資源,同時slave傳遞給master還會使得網(wǎng)絡io消耗增大。在beatles中認為export就是最后的一步,因此在這個時候做計算和導出。在我們很多系統(tǒng)中,考慮一下很多中間結(jié)果是否需要輸出,還是保留在最后一步輸出(并不是保留在最后一步一定好,取決于代價,如果最后一步有大量的計算要做,那么可以用內(nèi)存換機算,提早計算來減緩最后導出時的壓力,如果導出時計算不大,而系統(tǒng)整體處理內(nèi)存資源緊張,那么就滯后處理)。衍生開來很多時候,需要考慮重復計算帶來的成本和節(jié)省內(nèi)存帶來的收獲誰更有利,如果計算節(jié)點分散且規(guī)模巨大,則可以靠慮利用外部計算能力來減少集中式處理的代價(好比很多前端處理的結(jié)果可以滯后到客戶端處理而不是服務端集中處理,開放平臺的數(shù)據(jù)序列化推后到業(yè)務方集群處理而不是開放平臺統(tǒng)一處理)
JobManager
由于MasterNode中是單線程調(diào)用,因此對于任務狀態(tài)變更變得非常簡單(無需并發(fā)控制和原子操作),但由于MasterNode將來還是可擴展為多個線程處理,因此暫時保留原子操作的處理模式。
1. 對于對象狀態(tài)管理,如果對象層次比較多,盡量扁平化處理,就好比把TaskStatus直接保存,有利于檢查和原子操作,帶來的問題就是另一部分對象的狀態(tài)同步變更(Task中的狀態(tài)),其實簡單來說就是兩個數(shù)據(jù)結(jié)構(gòu)修改要做到事務性,做法比較簡單,細粒度的原子操作模擬鎖爭奪,例如要修改Task的Status首先要并發(fā)的修改TaskStatus的數(shù)據(jù)(if (statusPool.replace(taskId, JobTaskStatus.DOING, JobTaskStatus.DONE)),如果修改成功,才可以修改原始對象內(nèi)的數(shù)據(jù)。其實如果是單線程都不需要并發(fā)控制(因為并發(fā)的模式還是有些消耗的)。
2. 事件驅(qū)動模型中很重要一點就是事件狀態(tài)必須在所有必要操作后再改變(即創(chuàng)建事件),例如:早一個版本中,Master收到Slave返回結(jié)果時,將會把結(jié)果設置到Master的某一個Task的result屬性中,同時改變Task的狀態(tài)為done,這兩個動作就必須保持一定的順序,也就是先要把內(nèi)容設置進去,然后再改變狀態(tài),因為如果先改變狀態(tài),外部事件處理線程如果發(fā)現(xiàn)狀態(tài)已經(jīng)改變,又沒有鎖保證結(jié)果放進去以前不能處理這個事件,就會發(fā)現(xiàn)事件開始被處理了,但是內(nèi)容還是錯過了處理,出現(xiàn)線程并發(fā)問題。這點在這個版本的源碼注釋上面有點問題,后續(xù)修改掉它。
3. 在主流程上有一個方法mergeAndExportJobs,用于檢查Job內(nèi)部的Tasks完成狀態(tài),決定是否合并或者導出結(jié)果,首先受限制于JobManager主流程是單線程處理,同時內(nèi)部Tasks狀態(tài)隨時會變,因此要求主流程的所有操作和檢查都必須非阻塞,保證處理的即時性,但如果這個方法里面的所有操作都變成另起線程異步處理的話,就同樣會發(fā)生上面我談過的事件檢查多線程模式最終還是會并發(fā)控制下變成串行化,效率不升反降,因此采用同一業(yè)務性數(shù)據(jù)處理守護進程唯一性的方式(其實簡單來說就是在這里Master中管理多個Job,多個Job其實就好比多個事件隊列,因此必須并行處理,否則會有互相影響的風險,但是單個Job的處理可以只有一個守護線程處理,因此對Job加事件鎖,保證不同Job之間同一個事件并行,同一個job不同事件并行(這里由于都是順序化的,雖然并行了,但還是要等待上一個事件完成后才會修改內(nèi)部狀態(tài)繼續(xù)往下走))
4. 在第一篇里面說到,這個框架對于任務執(zhí)行異常的處理十分簡單,事先規(guī)定好單個任務執(zhí)行的最長可接受時間,如果到了時間尚未獲得反饋,就認為出現(xiàn)問題,任務重置可以接受下一個計算節(jié)點的處理請求。(結(jié)果誰先返回就用誰的)這里其實要注意兩點:任務時間可評估是基于任務切分粒度夠細,其實很多時候可以考慮通過任務細化來降低任務出現(xiàn)問題解決的復雜度,同時也可以降低計算節(jié)點重新做任務的代價。另一方面需要設置重置次數(shù)透明化,保證如果任務本身有問題(例如數(shù)據(jù)來源出現(xiàn)問題),不會使得所有的計算節(jié)點陷入單個任務處理死循環(huán)。
5. 合并數(shù)據(jù)的代碼優(yōu)化:
A. Master合并時每一個Job只有一個主干,也就是最后job的所有Task Result都必須合并到這個主干,假設這是個svn主干,可以想象多個人(多線程)是無法并行合并的。那么當主線程在A時刻發(fā)現(xiàn)有4個結(jié)果需要合并的時候,它開始把4個結(jié)果合并到主干,合并的過程中可能又來了3個結(jié)果,那么這三個結(jié)果就必須等待下一輪的合并開始,此時這三個結(jié)果耗費的內(nèi)存就會增加系統(tǒng)的負擔,同時系統(tǒng)如果Slave越多,這樣的情況越嚴重。因此引入下面一種模式,多線程合并,但主干和虛擬分支同時進行,當需要合并時首先競爭主干鎖,得到主干鎖的線程將這次需要合并的結(jié)果和以前合并的虛擬分支一起合并到主干,而如果沒有得到主干鎖的線程并行的合并結(jié)果到虛擬分支上。此時充分利用多核的計算能力來壓縮對于內(nèi)存的需求(結(jié)果合并后會大大減少存儲的需求)。
B. 由于A中的描述可以看到,主干在整個Job的任務執(zhí)行合并過程中都被保存在內(nèi)存中,因此當結(jié)果集越大,主干對系統(tǒng)內(nèi)存消耗就越大,而Job的多輪合并是否可以最后載入上一輪的主干和本輪增量結(jié)果合并,這樣可以大大減少內(nèi)存消耗,但是內(nèi)容的導出和載入帶來的序列化代價和IO的消耗勢必會增加每一輪的處理時間,和減少GC帶來的節(jié)省時間的優(yōu)勢可能會沖抵甚至有負面效果。因此通過異步載入和導出,即節(jié)省了內(nèi)存占用,減少FullGC帶來的停頓,又不影響處理,另一方面其實也是利用兩個階段的CPU閑置率較高來交換內(nèi)存的代價。(這部分代碼參看jobexporter和jobmanager)
SlaveNode:
充分利用Slave單機CPU的方式可以是:一臺機器可以跑多個Slave。也可以跑一個Slave,單個Slave一次要求獲取多個Task,這樣可以并行利用多個cpu處理多個任務。
為了減少Master的合并壓力,可以讓Slave直接輸出,也可以通過Slave要求多個Task,執(zhí)行完多個Task在本地合并(Task必須是同一個Job才可以合并),再將合并后的結(jié)果會送給Master。
對于同一個數(shù)據(jù)源可以通過創(chuàng)建同樣的多個Task來增加對其的處理速度,例如A機器的日志增長比B機器的快,那么可以配置,兩個數(shù)據(jù)來源是A機器的Task,配置一個B機器的Task,來差別對待處理速度。
對于處理后的數(shù)據(jù)如果還需要二次處理,可以構(gòu)建Job的數(shù)據(jù)來源是一次處理后的數(shù)據(jù)輸出地,當一次數(shù)據(jù)輸出以后,自然二次處理才會開始。
簡單來說,很多復雜的sharding設計,reduce的考慮,任務迭代處理,其實都可以通過扁平化的方式來解決,有時候花很很大力氣去做的看似很fancy的設計,不如歸一化處理。(再大的數(shù)字都是從一衍生出來的)
Connector:
這部分設計主要是屏蔽掉分布式概念的誤區(qū),很多分布式設計開始的時候不是注重對于主節(jié)點和次節(jié)點的業(yè)務交互上,而是糾結(jié)于底層設計上,最后就會落得調(diào)試難,擴展難的情形。和上面的歸一化設計思想一樣,所謂的分布式其實可以是一個進程內(nèi)(虛擬機內(nèi))的交互協(xié)作,一臺機器多進程的交互協(xié)作,多臺機器多進程的交互協(xié)作,因此如何能夠適合這三個場景,就會讓設計變得簡單,容易擴展,實現(xiàn)與接口分離。
Event:
Event中需要考慮一些上下文設計,例如序列號保證松散交互的會話可維護性,Channel等后續(xù)操作的基礎傳遞。Event盡量做到無業(yè)務侵入,例如雖然需要Channel,但不同的實現(xiàn)Channel是不同的,MemChannel和SocketChannel就不同,將來擴展更是不同,做好一些就抽象一些接口(但可能需要對一些實現(xiàn)做外殼封裝適應接口),或者就直接Object弱化類型。
InputAdaptor&OutputAdaptor:
任務的自描述性除了業(yè)務規(guī)則的自描述性,更需要輸入輸出的自描述性,所有計算歸結(jié)到底無非是輸入,處理,輸出,如果三者定義清楚,并且可以通過支持協(xié)議擴展適配,那么對于計算節(jié)點來說就非常通用了,不必因為業(yè)務的差別,數(shù)據(jù)來源和輸出的差別來分別建立多個集群,最終還是發(fā)現(xiàn)多個集群無法很好的充分利用資源的高低峰(對于明確要保護的計算集群可以直接構(gòu)建,對于一些非關鍵性的計算任務可以丟到一個集群中搞定),降低成本。
Job:
本身是一組任務的集合,自身有多個狀態(tài)位,當前通過多個狀態(tài)來表示(可以合并為一個原子狀態(tài)位),內(nèi)置一些鎖來控制主干的并發(fā)訪問,守護進程的分配。(這點在另一個PipeComet項目中對于長連接管道下行守護進程按需分配也有充分利用到)。
Operation:
這個包里面是將耗時的操作封裝為可以被外部線程獨立執(zhí)行的Runnable,可以看見在整體代碼里面有用外部線程異步執(zhí)行的,也有直接在線程里面阻塞執(zhí)行的,取決于對于結(jié)果返回的同步性需求,如果同步性需求明確,那么可以用異步+鎖的方式來模擬同步,也可以直接同步,但前者代價較大,所以將這類操作抽象,上下文通過參數(shù)傳遞來構(gòu)建出可以異步也可以同步執(zhí)行的邏輯塊,提高了功能執(zhí)行的靈活性。
CreateReportOperation中的輸出模式還是比較節(jié)省空間的,可以看一下如何基于<key,value>列矩陣輸出報表這樣的行式記錄保持對內(nèi)存較小的占用。
ReportUtil:
是個工具大雜燴。
1. mergeEntryResult。將多個矩陣結(jié)果合并的函數(shù),里面有不少的節(jié)省內(nèi)存的做法,首先選取第一個矩陣作為base,節(jié)省申請和合并的過程,合并過程中不斷刪除合并后的數(shù)據(jù),節(jié)省后續(xù)合并成本,釋放資源。
2. compressString。嘗試采用不可逆壓縮來減少處理中中間key占用的內(nèi)存,例如每一個entry的key是幾個列的組合,而key僅僅表示唯一性,如果能夠做到壓縮且不失唯一性,那么最終不會影響需要輸出的結(jié)果。這里采用短鏈接的處理方式。(md5+16以上的進制模式)
TimeOutQueue:
最前面提到過,基本所有的外部對象狀態(tài)變更都可以被捕獲,然后產(chǎn)生一個事件,而超時事件必須是主動檢查才可以判斷,因此當對象數(shù)據(jù)量增加的時候,超時檢查的消耗就會變成O(N),一般會推薦使用分區(qū)模式(時間輪盤,時間槽)來縮減N增加帶來的影響,另一種方式比較適合于超時時間不會變動的情況,就好比將一個對象放入后,它的超時時間從創(chuàng)建初期到銷毀都不會再改變,如果是這種情況,那么可以采用這個類的實現(xiàn)方式。
內(nèi)置一個有順序的單向鏈或者隊列,按照超時時間的前后建立先后順序,最早超時的對象放在最前面,內(nèi)部線程每次從隊列或者鏈的第一位開始檢查,如果發(fā)現(xiàn)超時,則處理,繼續(xù)往后走,當發(fā)現(xiàn)沒有超時的時候,獲得該對象距離超時時間的間隔,然后掛起這間隔的時間。期間有任何數(shù)據(jù)加入,如果超時時間小于隊列第一個對象超時時間,則加入隊列,然后喚醒檢查線程(切記順序不要反,先加入隊列,再喚醒)。最后在增加一個防止隊列為空的消費者生產(chǎn)者的標識,保證不要空循環(huán)。