一、引言
要實現(xiàn)一整套能用于大用戶量、高并發(fā)場景下的IM群聊,技術(shù)難度遠超IM系統(tǒng)中的其它功能,原因在于:IM群聊消息的實時寫擴散特性帶來了一系列技術(shù)難題。
舉個例子:如一個2000人群里,一條普通消息的發(fā)出問題,將瞬間寫擴散為2000條消息的接收問題,如何保證這些消息的及時、有序、高效地送達,涉及到的技術(shù)問題點實在太多,更別說個別場景下萬人大群里的炸群消息難題了更別說個別場景下萬人大群里的炸群消息難題了。
這也是為什么一般中大型IM系統(tǒng)中,都會將群聊單獨拎出來考慮架構(gòu)的設(shè)計,單獨有針對性地進行架構(gòu)優(yōu)化,從而降低整個系統(tǒng)的設(shè)計難度。
本文將分享的是一套生產(chǎn)環(huán)境下的IM群聊消息系統(tǒng)的高可用、易伸縮、高并發(fā)架構(gòu)設(shè)計實踐,屬于原創(chuàng)第一手資料,內(nèi)容較專業(yè),適合有一定IM架構(gòu)經(jīng)驗的后端程序員閱讀。
推薦:如有興趣,本文作者的另一篇《一套原創(chuàng)分布式即時通訊(IM)系統(tǒng)理論架構(gòu)方案》,也適合正在進行IM系統(tǒng)架構(gòu)設(shè)計研究的同學閱讀。
學習交流:
- 即時通訊開發(fā)交流3群:185926912[推薦]
- 移動端IM開發(fā)入門文章:《新手入門一篇就夠:從零開發(fā)移動端IM》
(本文同步發(fā)布于:http://www.52im.net/thread-2015-1-1.html)
二、群聊技術(shù)文章
《IM群聊消息究竟是存1份(即擴散讀)還是存多份(即擴散寫)?》
《IM群聊消息的已讀回執(zhí)功能該怎么實現(xiàn)?》
《現(xiàn)代IM系統(tǒng)中聊天消息的同步和存儲方案探討》
《移動端IM中大規(guī)模群消息的推送如何保證效率、實時性?》
《微信后臺團隊:微信后臺異步消息隊列的優(yōu)化升級實踐分享》
《IM單聊和群聊中的在線狀態(tài)同步應該用“推”還是“拉”?》
《快速裂變:見證微信強大后臺架構(gòu)從0到1的演進歷程(一)》
三、萬事開頭難:初始的極簡實現(xiàn)
所謂的群聊消息系統(tǒng),就是一種多對多群體聊天方式,譬如直播房間內(nèi)的聊天室對應的服務(wù)器端就是一個群聊消息系統(tǒng)。
2017年9月初,我們初步實現(xiàn)了一套極簡的群聊消息系統(tǒng),其大致架構(gòu)如下:
系統(tǒng)名詞解釋:
1)Client : 消息發(fā)布者【或者叫做服務(wù)端群聊消息系統(tǒng)調(diào)用者】,publisher;
2)Proxy : 系統(tǒng)代理,對外統(tǒng)一接口,收集Client發(fā)來的消息轉(zhuǎn)發(fā)給Broker;
3)Broker :系統(tǒng)消息轉(zhuǎn)發(fā)Server,Broker 會根據(jù) Gateway Message 組織一個 RoomGatewayList【key為RoomID,value為 Gateway IP:Port 地址列表】,然后把 Proxy 發(fā)來的消息轉(zhuǎn)發(fā)到 Room 中所有成員登錄的所有 Gateway;
4)Router :用戶登錄消息轉(zhuǎn)發(fā)者,把Gateway轉(zhuǎn)發(fā)來的用戶登入登出消息轉(zhuǎn)發(fā)給所有的Broker;
5)Gateway :所有服務(wù)端的入口,接收合法客戶端的連接,并把客戶端的登錄登出消息通過Router轉(zhuǎn)發(fā)給所有的Broker;
6)Room Message : Room聊天消息;
7)Gateway Message : Room內(nèi)某成員 登錄 或者 登出 某Gateway消息,包含用戶UIN/RoomID/Gateway地址{IP:Port}等消息。
當一個 Room 中多個 Client 連接一個 Gateway 的時候,Broker只會根據(jù) RoomID 把房間內(nèi)的消息轉(zhuǎn)發(fā)一次給這個Gateway,由Gateway再把消息復制多份分別發(fā)送給連接這個 Gateway 的 Room 中的所有用戶的客戶端。
這套系統(tǒng)有如下特點:
1)系統(tǒng)只轉(zhuǎn)發(fā)房間內(nèi)的聊天消息,每個節(jié)點收到后立即轉(zhuǎn)發(fā)出去,不存儲任何房間內(nèi)的聊天消息,不考慮消息丟失以及消息重復的問題;
2)系統(tǒng)固定地由一個Proxy、三個Broker和一個Router構(gòu)成;
3)Proxy接收后端發(fā)送來的房間消息,然后按照一定的負載均衡算法把消息發(fā)往某個Broker,Broker則把消息發(fā)送到所有與Room有關(guān)系的接口機Gateway;
4)Router接收Gateway轉(zhuǎn)發(fā)來的某個Room內(nèi)某成員在這個Gateway的登出或者登錄消息,然后把消息發(fā)送到所有Broker;
5)Broker收到Router轉(zhuǎn)發(fā)來的Gateway消息后,更新(添加或者刪除)與某Room相關(guān)的Gateway集合記錄;
6)整個系統(tǒng)的通信鏈路采用UDP通信方式。
從以上特點,整個消息系統(tǒng)足夠簡單,沒有考慮擴縮容問題,當系統(tǒng)負載到達極限的時候,就重新再部署一套系統(tǒng)以應對后端client的消息壓力。
這種處理方式本質(zhì)是把系統(tǒng)的擴容能力甩鍋給了后端Client以及前端Gateway:每次擴容一個系統(tǒng),所有Client需要在本地配置文件中添加一個Proxy地址然后全部重啟,所有Gateway則需要再本地配置文件添加一個Router地址然后全部重啟。
這種“幸福我一人,辛苦千萬家”的擴容應對方式,必然導致公司內(nèi)部這套系統(tǒng)的使用者怨聲載道,下一階段的升級就是必然的了。
四、進一步重點設(shè)計:“可擴展性”
4.1、基本思路
大道之行也,天下為公,不同的系統(tǒng)有不同的構(gòu)架,相同的系統(tǒng)總有類似的實現(xiàn)。類似于數(shù)據(jù)庫的分庫分表【關(guān)于分庫分表,目前看到的最好的文章是《一種支持自由規(guī)劃無須數(shù)據(jù)遷移和修改路由代碼的Replicaing擴容方案》】,其擴展實現(xiàn)核心思想是分Partition分Replica,但各Replica之間還區(qū)分leader(leader-follower,只有l(wèi)eader可接受寫請求)和non-leader(所有replica均可接收寫請求)兩種機制。
從數(shù)據(jù)角度來看,這套系統(tǒng)接收兩種消息:Room Message(房間聊天消息)和Gateway Message(用戶登錄消息)。兩種消息的交匯之地就是Broker,所以應對擴展的緊要地方就是Broker,Broker的每個Partition采用non-leader機制,各replica均可接收Gateway Message消息寫請求和Room Message轉(zhuǎn)發(fā)請求。
首先,當Room Message量加大時可以對Proxy進行水平擴展,多部署Proxy即可因應Room Message的流量。
其次,當Gateway Message量加大時可以對Router進行水平擴展,多部署Router即可因應Gateway Message的流量。
最后,兩種消息的交匯之地Broker如何擴展呢?可以把若干Broker Replica組成一個Partition,因為Gateway Message是在一個Partition內(nèi)廣播的,所有Broker Replica都會有相同的RoomGatewayList 數(shù)據(jù),因此當Gateway Message增加時擴容Partition即可。當Room Message量增加時,水平擴容Partition內(nèi)的Broker Replica即可,因為Room Message只會發(fā)送到Partition內(nèi)某個Replica上。
從個人經(jīng)驗來看,Room ID的增長以及Room內(nèi)成員的增加量在一段時間內(nèi)可以認為是直線增加,而Room Message可能會以指數(shù)級增長,所以若設(shè)計得當則Partition擴容的概率很小,而Partition內(nèi)Replica水平增長的概率幾乎是100%。
不管是Partition級別的水平擴容還是Partition Replica級別的水平擴容,不可能像系統(tǒng)極簡版本那樣每次擴容后都需要Client或者Gateway去更新配置文件然后重啟,因應之道就是可用zookeeper充當角色的Registriy。通過這個zookeeper注冊中心,相關(guān)角色擴容的時候在Registry注冊后,與之相關(guān)的其他模塊得到通知即可獲取其地址等信息。采用zookeeper作為Registry的時候,所以程序?qū)崿F(xiàn)的時候采用實時watch和定時輪詢的策略保證數(shù)據(jù)可靠性,因為一旦網(wǎng)絡(luò)有任何的抖動,zk就會認為客戶端已經(jīng)宕機把鏈接關(guān)閉。
分析完畢,與之相對的架構(gòu)圖如下:
以下各分章節(jié)將描述各個模塊詳細流程。
4.2、Client
Client詳細流程如下:
1)從配置文件加載Registry地址;
2)從Registy上Proxy注冊路徑/pubsub/proxy下獲取所有的Proxy,依據(jù)各個Proxy ID大小順序遞增組成一個ProxyArray;
3)啟動一個線程實時關(guān)注Registry路徑/pubsub/proxy,以獲取Proxy的動態(tài)變化,及時更新ProxyArray;
4)啟動一個線程定時輪詢獲取Registry路徑/pubsub/proxy下各個Proxy實例,作為關(guān)注策略的補充,以期本地ProxyArray內(nèi)各個Proxy成員與Registry上的各個Proxy保持一致;定時給各個Proxy發(fā)送心跳,異步獲取心跳回包;定時清除ProxyArray中心跳超時的Proxy成員;
5)發(fā)送消息的時候采用snowflake算法給每個消息分配一個MessageID,然后采用相關(guān)負載均衡算法把消息轉(zhuǎn)發(fā)給某個Proxy。
4.3、Proxy
Proxy詳細流程如下:
1)讀取配置文件,獲取Registry地址;
2)把自身信息注冊到Registry路徑/pubsub/proxy下,把Registry返回的ReplicaID作為自身ID;
3)從Registry路徑/pubsub/broker/partition(x)下獲取每個Broker Partition的各個replica;
4)從Registry路徑/pubsub/broker/partition_num獲取當前有效的Broker Partition Number;
5)啟動一個線程關(guān)注Registry上的Broker路徑/pubsub/broker,以實時獲取以下信息:
{Broker Partition Number}
- 新的Broker Partition(此時發(fā)生了擴容);
- Broker Partition內(nèi)新的broker replica(Partition內(nèi)發(fā)生了replica擴容);
- Broker Parition內(nèi)某replica掛掉的信息;
6)定時向各個Broker Partition replica發(fā)送心跳,異步等待Broker返回的心跳響應包,以探測其活性,以保證不向超時的replica轉(zhuǎn)發(fā)Room Message;
7)啟動一個線程定時讀取Registry上的Broker路徑/pubsub/broker下各個子節(jié)點的值,以定時輪詢的策略觀察Broker Partition Number變動,以及各Partition的變動情況,作為實時策略的補充;同時定時檢查心跳包超時的Broker,從有效的BrokerList中刪除;
8)依據(jù)規(guī)則【BrokerPartitionID = RoomID % BrokerPartitionNum, BrokerReplicaID = RoomID % BrokerPartitionReplicaNum】向某個Partition的replica轉(zhuǎn)發(fā)Room Message,收到Client的Heatbeat包時要及時給予響應。
之所以把Room Message和Heartbeat Message放在一個線程處理,是為了防止進程假死這種情況。
當/pubsub/broker/partition_num的值發(fā)生改變的時候(譬如值改為4),意味著Router Partition進行了擴展,Proxy要及時獲取新Partition路徑(如/pubsub/broker/Partition2和/pubsub/broker/Partition3)下的實例,并關(guān)注這些路徑,獲取新Partition下的實例。
之所以Proxy在獲取Registry下所有當前的Broker實例信息后再注冊自身信息,是因為此時它才具有轉(zhuǎn)發(fā)消息的資格。
Proxy轉(zhuǎn)發(fā)某個Room消息時候,只發(fā)送給處于Running狀態(tài)的Broker。為Broker Partition內(nèi)所有replica依據(jù)Registry給其分配的replicaID進行遞增排序,組成一個Broker Partition Replica Array,規(guī)則中BrokerPartitionReplicaNum為Array的size,而BrokerReplicaID為replica在Array中的下標。
4.4、Pipeline
收到的 Room Message 需要做三部工作:收取 Room Message、消息協(xié)議轉(zhuǎn)換和向 Broker 發(fā)送消息。
初始系統(tǒng)這三步流程如果均放在一個線程內(nèi)處理,proxy 的整體吞吐率只有 50 000 Msg/s。
最后的實現(xiàn)方式是按照消息處理的三個步驟以 pipeline 方式做如下流程處理:
1)啟動 1 個消息接收線程和 N【N == Broker Parition 數(shù)目】個多寫一讀形式的無鎖隊列【稱之為消息協(xié)議轉(zhuǎn)換隊列】,消息接收線程分別啟動一個 epoll 循環(huán)流程收取消息,然后把消息以相應的 hash 算法【隊列ID = UIN % N】寫入對應的消息協(xié)議轉(zhuǎn)換隊列;
2)啟動 N 個線程 和 N * 3 個一寫一讀的無鎖隊列【稱之為消息發(fā)送隊列】,每個消息協(xié)議專家線程從消息協(xié)議轉(zhuǎn)換隊列接收到消息并進行協(xié)議轉(zhuǎn)換后,根據(jù)相應的 hash 算法【隊列ID = UIN % 3N】寫入消息發(fā)送隊列;
3)啟動 3N 個消息發(fā)送線程,分別創(chuàng)建與之對應的 Broker 的連接,每個線程單獨從對應的某個消息發(fā)送隊列接收消息然后發(fā)送出去。
經(jīng)過以上流水線改造后,Proxy 的整體吞吐率可達 200 000 Msg/s。
關(guān)于 pipeline 自身的解釋,本文不做詳述,可以參考下圖:
4.5、大房間消息處理
每個 Room 的人數(shù)不均,最簡便的解決方法就是給不同人數(shù)量級的 Room 各搭建一套消息系統(tǒng),不用修改任何代碼。
然所謂需求推動架構(gòu)改進,在系統(tǒng)迭代升級過程中遇到了這樣一個需求:業(yè)務(wù)方有一個全國 Room,用于給所有在線用戶進行消息推送。針對這個需求,不可能為了一個這樣的 Room 單獨搭建一套系統(tǒng),況且這個 Room 的消息量很少。
如果把這個 Room 的消息直接發(fā)送給現(xiàn)有系統(tǒng),它有可能影響其他 Room 的消息發(fā)送:消息系統(tǒng)是一個寫放大的系統(tǒng),全國 Room 內(nèi)有系統(tǒng)所有的在線用戶,每次發(fā)送都會卡頓其他 Room 的消息發(fā)送。
最終的解決方案是:使用類似于分區(qū)的方法,把這樣的大 Room 映射為 64 個虛擬 Room【稱之為 VRoom】。在 Room 號段分配業(yè)務(wù)線的配合下,給消息系統(tǒng)專門保留了一個號段,用于這種大 Room 的切分,在 Proxy 層依據(jù)一個 hash 方法 【 VRoomID = UserID % 64】 把每個 User 分配到相應的 VRoom,其他模塊代碼不用修改即完成了大 Room 消息的路由。
4.6、Broker
Broker詳細流程如下:
1)Broker加載配置,獲取自身所在Partition的ID(假設(shè)為3);
2)向Registry路徑/pubsub/broker/partition3注冊,設(shè)置其狀態(tài)為Init,注冊中心返回的ID作為自身的ID(replicaID);
3)接收Router轉(zhuǎn)發(fā)來的Gateway Message,放入GatewayMessageQueue;
4)從Database加載數(shù)據(jù),把自身所在的Broker Partition所應該負責的 RoomGatewayList 數(shù)據(jù)加載進來;
5)異步處理GatewayMessageQueue內(nèi)的Gateway Message,只處理滿足規(guī)則【PartitionID == RoomID % PartitionNum】的消息,把數(shù)據(jù)存入本地路由信息緩存;
6)修改Registry路徑/pubsub/broker/partition3下自身節(jié)點的狀態(tài)為Running;
7)啟動線程實時關(guān)注Registry路徑/pubsub/broker/partition_num的值;
8)啟動線程定時查詢Registry路徑/pubsub/broker/partition_num的值;
9)當Registry路徑/pubsub/broker/partition_num的值發(fā)生改變的時候,依據(jù)規(guī)則【PartitionID == RoomID % PartitionNum】清洗本地路由信息緩存中每條數(shù)據(jù);
10)接收Proxy發(fā)來的Room Message,依據(jù)RoomID從路由信息緩存中查找Room有成員登陸的所有Gateway,把消息轉(zhuǎn)發(fā)給這些Gateway。
注意Broker之所以先注冊然后再加載Database中的數(shù)據(jù),是為了在加載數(shù)據(jù)的時候同時接收Router轉(zhuǎn)發(fā)來的Gateway Message,但是在數(shù)據(jù)加載完前這些受到的數(shù)據(jù)先被緩存起來,待所有 RoomGatewayList 數(shù)據(jù)加載完后就把這些數(shù)據(jù)重放一遍;
Broker之所以區(qū)分狀態(tài),是為了在加載完畢 RoomGatewayList 數(shù)據(jù)前不對Proxy提供轉(zhuǎn)發(fā)消息的服務(wù),同時也方便Broker Partition應對的消息量增大時進行水平擴展。
當Broker發(fā)生Partition擴展的時候,新的Partition個數(shù)必須是2的冪,只有新Partition內(nèi)所有Broker Replica都加載實例完畢,再更改/pubsub/broker/partition_num的值。
老的Broker也要watch路徑/pubsub/broker/partition_num的值,當這個值增加的時候,它也需要清洗本地的路由信息緩存。
Broker的擴容過程猶如細胞分裂,形成中的兩個細胞有著完全相同的數(shù)據(jù),分裂完成后【Registry路徑/pubsub/broker/partition_num的值翻倍】則需要清洗垃圾信息。這種方法稱為翻倍法。
4.7、Router
Router詳細流程如下:
1)Router加載配置,Registry地址;
2)把自身信息注冊到Registry路徑/pubsub/router下,把Registry返回的ReplicaID作為自身ID;
3)從Registry路徑/pubsub/broker/partition(x)下獲取每個Broker Partition的各個replica;
4)從Registry路徑/pubsub/broker/partition_num獲取當前有效的Broker Partition Number;
5)啟動一個線程關(guān)注Registry上的Broker路徑/pubsub/broker,以實時獲取以下信息:
{Broker Partition Number}
- 新的Broker Partition(此時發(fā)生了擴容);
- Broker Partition內(nèi)新的broker replica(Partition內(nèi)發(fā)生了replica擴容);
- Broker Parition內(nèi)某replica掛掉的信息;
6)定時向各個Broker Partition replica發(fā)送心跳,異步等待Broker返回的心跳響應包,以探測其活性,以保證不向超時的replica轉(zhuǎn)發(fā)Gateway Message;
7)啟動一個線程定時讀取Registry上的Broker路徑/pubsub/broker下各個子節(jié)點的值,以定時輪詢的策略觀察Broker Partition Number變動,以及各Partition的變動情況,作為實時策略的補充;同時定時檢查心跳包超時的Broker,從有效的BrokerList中刪除;
8)從Database全量加載路由 RoomGatewayList 數(shù)據(jù)放入本地緩存;
9)收取Gateway發(fā)來的心跳消息,及時返回ack包;
10)收取Gateway轉(zhuǎn)發(fā)來的Gateway Message,按照一定規(guī)則【BrokerPartitionID % BrokerPartitionNum = RoomID % BrokerPartitionNum】轉(zhuǎn)發(fā)給某個Broker Partition下所有Broker Replica,保證Partition下所有replica擁有同樣的路由 RoomGatewayList 數(shù)據(jù),再把Message內(nèi)數(shù)據(jù)存入本地緩存,當檢測到數(shù)據(jù)不重復的時候把數(shù)據(jù)異步寫入Database。
4.8、Gateway
Gateway詳細流程如下:
1)讀取配置文件,加載Registry地址;
2)從Registry路徑/pubsub/router/下獲取所有router replica,依據(jù)各Replica的ID遞增排序組成replica數(shù)組RouterArray;
3)啟動一個線程實時關(guān)注Registry路徑/pubsub/router,以獲取Router的動態(tài)變化,及時更新RouterArray;
4)啟動一個線程定時輪詢獲取Registry路徑/pubsub/router下各個Router實例,作為關(guān)注策略的補充,以期本地RouterArray及時更新;定時給各個Router發(fā)送心跳,異步獲取心跳回包;定時清除RouterArray中心跳超時的Router成員;
5)當有Room內(nèi)某成員客戶端連接上來或者Room內(nèi)所有成員都不連接當前Gateway節(jié)點時,依據(jù)規(guī)則【RouterArrayIndex = RoomID % RouterNum】向某個Router發(fā)送Gateway Message;
6)收到Broker轉(zhuǎn)發(fā)來的Room Message時,根據(jù)MessageID進行去重,如果不重復則把消息發(fā)送到連接到當前Gateway的Room內(nèi)所有客戶端,同時把MessageID緩存起來以用于去重判斷。
Gateway本地有一個基于共享內(nèi)存的LRU Cache,存儲最近一段時間發(fā)送的消息的MessageID。
五、接下來迫切要解決的:系統(tǒng)穩(wěn)定性
系統(tǒng)具有了可擴展性僅僅是系統(tǒng)可用的初步,整個系統(tǒng)要保證最低粒度的SLA(0.99),就必須在兩個維度對系統(tǒng)的可靠性就行感知:消息延遲和系統(tǒng)內(nèi)部組件的高可用。
5.1、消息延遲
準確的消息延遲的統(tǒng)計,通用的做法可以基于日志系統(tǒng)對系統(tǒng)所有消息或者以一定概率抽樣后進行統(tǒng)計,但限于人力目前沒有這樣做。
目前使用了一個方法:通過一種構(gòu)造一組偽用戶ID,定時地把消息發(fā)送給proxy,每條消息經(jīng)過一層就把在這層的進入時間和發(fā)出時間以及組件自身的一些信息填入消息,這組偽用戶的消息最終會被發(fā)送到一個偽Gateway端,偽Gateway對這些消息的信息進行歸并統(tǒng)計后,即可計算出當前系統(tǒng)的平均消息延遲時間。
通過所有消息的平均延遲可以評估系統(tǒng)的整體性能。同時,因為系統(tǒng)消息路由的哈希方式已知,當固定時間內(nèi)偽Gateway沒有收到消息時,就把消息當做發(fā)送失敗,當某條鏈路失敗一定次數(shù)后就可以產(chǎn)生告警了。
5.2、高可用
上面的方法同時能夠檢測某個鏈路是否出問題,但是鏈路具體出問題的點無法判斷,且實時性無法保證。
為了保證各個組件的高可用,系統(tǒng)引入了另一種評估方法:每個層次都給后端組件發(fā)送心跳包,通過心跳包的延遲和成功率判斷其下一級組件的當前的可用狀態(tài)。
譬如proxy定時給每個Partition內(nèi)每個broker發(fā)送心跳,可以依據(jù)心跳的成功率來快速判斷broker是否處于“假死”狀態(tài)(最近業(yè)務(wù)就遇到過broker進程還活著,但是對任何收到的消息都不處理的情況)。
同時依靠心跳包的延遲還可以判斷broker的處理能力,基于此延遲值可在同一Partition內(nèi)多broker端進行負載均衡。
六、進一步優(yōu)化:消息可靠性
公司內(nèi)部內(nèi)部原有一個走tcp通道的群聊消息系統(tǒng),但是經(jīng)過元旦一次大事故(幾乎全線崩潰)后,相關(guān)業(yè)務(wù)的一些重要消息改走這套基于UDP的群聊消息系統(tǒng)了。這些消息如服務(wù)端下達給客戶端的游戲動作指令,是不允許丟失的,但其特點是相對于聊天消息來說量非常小(單人1秒最多一個),所以需要在目前UDP鏈路傳遞消息的基礎(chǔ)之上再構(gòu)建一個可靠消息鏈路。
國內(nèi)某IM大廠的消息系統(tǒng)也是以UDP鏈路為基礎(chǔ)的(見《為什么QQ用的是UDP協(xié)議而不是TCP協(xié)議?》),他們的做法是消息重試加ack構(gòu)建了可靠消息穩(wěn)定傳輸鏈路。但是這種做法會降低系統(tǒng)的吞吐率,所以需要獨辟蹊徑。
UDP通信的本質(zhì)就是偽裝的IP通信,TCP自身的穩(wěn)定性無非是重傳、去重和ack,所以不考慮消息順序性的情況下可以通過重傳與去重來保證消息的可靠性。
基于目前系統(tǒng)的可靠消息傳輸流程如下:
1)Client給每個命令消息依據(jù)snowflake算法配置一個ID,復制三份,立即發(fā)送給不同的Proxy;
2)Proxy收到命令消息以后隨機發(fā)送給一個Broker;
3)Broker收到后傳輸給Gateway;
4)Gateway接收到命令消息后根據(jù)消息ID進行重復判斷,如果重復則丟棄,否則就發(fā)送給APP,并緩存之。
正常的消息在群聊消息系統(tǒng)中傳輸時,Proxy會根據(jù)消息的Room ID傳遞給固定的Broker,以保證消息的有序性。
七、Router需要進一步強化
7.1、簡述
當線上需要部署多套群聊消息系統(tǒng)的時候,Gateway需要把同樣的Room Message復制多份轉(zhuǎn)發(fā)給多套群聊消息系統(tǒng),會增大Gateway壓力,可以把Router單獨獨立部署,然后把Room Message向所有的群聊消息系統(tǒng)轉(zhuǎn)發(fā)。
Router系統(tǒng)原有流程是:Gateway按照Room ID把消息轉(zhuǎn)發(fā)給某個Router,然后Router把消息轉(zhuǎn)發(fā)給下游Broker實例。新部署一套群聊消息系統(tǒng)的時候,新系統(tǒng)Broker的schema需要通過一套約定機制通知Router,使得Router自身邏輯過于復雜。
重構(gòu)后的Router架構(gòu)參照上圖,也采用分Partition分Replica設(shè)計,Partition內(nèi)部各Replica之間采用non-leader機制;各Router Replica不會主動把Gateway Message內(nèi)容push給各Broker,而是各Broker主動通過心跳包形式向Router Partition內(nèi)某個Replica注冊,而后此Replica才會把消息轉(zhuǎn)發(fā)到這個Broker上。
類似于Broker,Router Partition也以2倍擴容方式進行Partition水平擴展,并通過一定機制保證擴容或者Partition內(nèi)部各個實例停止運行或者新啟動時,盡力保證數(shù)據(jù)的一致性。
Router Replica收到Gateway Message后,replica先把Gateway Message轉(zhuǎn)發(fā)給Partition內(nèi)各個peer replica,然后再轉(zhuǎn)發(fā)給各個訂閱者。Router轉(zhuǎn)發(fā)消息的同時異步把消息數(shù)據(jù)寫入Database。
獨立Router架構(gòu)下,下面小節(jié)將分別詳述Gateway、Router和Broker三個相關(guān)模塊的詳細流程。
7.2、Gateway
Gateway詳細流程如下:
1)從Registry路徑/pubsub/router/partition(x)下獲取每個Partition的各個replica;
2)從Registry路徑/pubsub/router/partition_num獲取當前有效的Router Partition Number;
3)啟動一個線程關(guān)注Registry上的Router路徑/pubsub/router,以實時獲取以下信息:{Router Partition Number} -> 新的Router Partition(此時發(fā)生了擴容); Partition內(nèi)新的replica(Partition內(nèi)發(fā)生了replica擴容); Parition內(nèi)某replica掛掉的信息;
4)定時向各個Partition replica發(fā)送心跳,異步等待Router返回的心跳響應包,以探測其活性,以保證不向超時的replica轉(zhuǎn)發(fā)Gateway Message;
5)啟動一個線程定時讀取Registry上的Router路徑/pubsub/router下各個子節(jié)點的值,以定時輪詢的策略觀察Router Partition Number變動,以及各Partition的變動情況,作為實時策略的補充;同時定時檢查心跳包超時的Router,從有效的BrokerList中刪除;
6 依據(jù)規(guī)則向某個Partition的replica轉(zhuǎn)發(fā)Gateway Message。
第六步的規(guī)則決定了Gateway Message的目的Partition和replica,規(guī)則內(nèi)容有:
如果某Router Partition ID滿足condition(RoomID % RouterPartitionNumber == RouterPartitionID % RouterPartitionNumber),則把消息轉(zhuǎn)發(fā)到此Partition;
這里之所以不采用直接hash方式(RouterPartitionID = RoomID % RouterPartitionNumber)獲取Router Partition,是考慮到當Router進行2倍擴容的時候當所有新的Partition的所有Replica都啟動完畢且數(shù)據(jù)一致時才會修改Registry路徑/pubsub/router/partitionnum的值,按照規(guī)則的計算公式才能保證新Partition的各個Replica在啟動過程中就可以得到Gateway Message,也即此時每個Gateway Message會被發(fā)送到兩個Router Partition。 當Router擴容完畢,修改Registry路徑/pubsub/router/partitionnum的值后,此時新集群進入穩(wěn)定期,每個Gateway Message只會被發(fā)送固定的一個Partition,condition(RoomID % RouterPartitionNumber == RouterPartitionID % RouterPartitionNumber)等效于condition(RouterPartitionID = RoomID % RouterPartitionNumber)。
如果Router Partition內(nèi)某replia滿足condition(replicaPartitionID = RoomID % RouterPartitionReplicaNumber),則把消息轉(zhuǎn)發(fā)到此replica。
replica向Registry注冊的時候得到的ID稱之為replicaID,Router Parition內(nèi)所有replica按照replicaID遞增排序組成replica數(shù)組RouterPartitionReplicaArray,replicaPartitionID即為replica在數(shù)組中的下標。
Gateway Message數(shù)據(jù)一致性:
Gateway向Router發(fā)送的Router Message內(nèi)容有兩種:某user在當前Gateway上進入某Room 和 某user在當前Gateway上退出某Room,數(shù)據(jù)項分別是UIN(用戶ID)、Room ID、Gateway Addr和User Action(Login or Logout。
由于所有消息都是走UDP鏈路進行轉(zhuǎn)發(fā),則這些消息的順序就有可能亂序。Gateway可以統(tǒng)一給其發(fā)出的所有消息分配一個全局遞增的ID【下文稱為GatewayMsgID,Gateway Message ID】以保證消息的唯一性和全局有序性。
Gateway向Registry注冊臨時有序節(jié)點時,Registry會給Gateway分配一個ID,Gateway可以用這個ID作為自身的Instance ID【假設(shè)這個ID上限是65535】。
GatewayMsgID字長是64bit,其格式如下:
//63 -------------------------- 48 47 -------------- 38 37 ------------ 0
//| 16bit Gateway Instance ID | 10bit Reserve | 38bit自增碼 |
7.3、Router
Router系統(tǒng)部署之前,先設(shè)置Registry路徑/pubsub/router/partition_num的值為1。
Router詳細流程如下:
1)Router加載配置,獲取自身所在Partition的ID(假設(shè)為3);
2)向Registry路徑/pubsub/router/partition3注冊,設(shè)置其狀態(tài)為Init,注冊中心返回的ID作為自身的ID(replicaID);
3)注冊完畢會收到Gateway發(fā)來的Gateway Message以及Broker發(fā)來的心跳消息(HeartBeat Message),先緩存到消息隊列MessageQueue;
4)從Registry路徑/pubsub/router/partition3下獲取自身所在的Partition內(nèi)的各個replica;
5)從Registry路徑/pubsub/router/partition_num獲取當前有效的Router Partition Number;
6)啟動一個線程關(guān)注Registry路徑/pubsub/router,以實時獲取以下信息:{Router Partition Number} -> Partition內(nèi)新的replica(Partition內(nèi)發(fā)生了replica擴容); Parition內(nèi)某replica掛掉的信息;
7)從Database加載數(shù)據(jù);
8)啟動一個線程異步處理MessageQueue內(nèi)的Gateway Message,把Gateway Message轉(zhuǎn)發(fā)給同Partition內(nèi)其他peer replica,然后依據(jù)規(guī)則【RoomID % BrokerPartitionNumber == BrokerReplicaPartitionID % BrokerPartitionNumber】轉(zhuǎn)發(fā)給BrokerList內(nèi)每個Broker;處理Broker發(fā)來的心跳包,把Broker的信息存入本地BrokerList,然后給Broker發(fā)送回包;
9)修改Registry路徑/pubsub/router/partition3下節(jié)點的狀態(tài)為Running;
10)啟動一個線程定時讀取Registry路徑/pubsub/router下各個子路徑的值,以定時輪詢的策略觀察Router各Partition的變動情況,作為實時策略的補充;檢查超時的Broker,把其從BrokerList中剔除;
11)當RouterPartitionNum倍增時,Router依據(jù)規(guī)則【RoomID % BrokerPartitionNumber == BrokerReplicaPartitionID % BrokerPartitionNumber】清洗自身路由信息緩存中數(shù)據(jù);
12)Router本地存儲每個Gateway的最大GatewayMsgID,收到小于GatewayMsgID的Gateway Message可以丟棄不處理,否則就更新GatewayMsgID并根據(jù)上面邏輯進行處理。
之所以把Gateway Message和Heartbeat Message放在一個線程處理,是為了防止進程假死這種情況。
Broker也采用了分Partition分Replica機制,所以向Broker轉(zhuǎn)發(fā)Gateway Message時候路由規(guī)則,與Gateway向Router轉(zhuǎn)發(fā)消息的路由規(guī)則相同。
另外啟動一個工具,當水平擴展后新啟動的Partition內(nèi)所有Replica的狀態(tài)都是Running的時候,修改Registry路徑/pubsub/router/partition_num的值為所有Partition的數(shù)目。
7.4、Broker
Broker詳細流程如下:
1)Broker加載配置,獲取自身所在Partition的ID(假設(shè)為3);
2)向Registry路徑/pubsub/broker/partition3注冊,設(shè)置其狀態(tài)為Init,注冊中心返回的ID作為自身的ID(replicaID);
3)從Registry路徑/pubsub/router/partition_num獲取當前有效的Router Partition Number;
4)從Registry路徑/pubsub/router/partition(x)下獲取每個Router Partition的各個replica;
5)啟動一個線程關(guān)注Registry路徑/pubsub/router,以實時獲取以下信息:{Router Partition Number} -> 新的Router Partition(此時發(fā)生了擴容); Partition內(nèi)新的replica(Partition內(nèi)發(fā)生了replica擴容); Parition內(nèi)某replica掛掉的信息;
6)依據(jù)規(guī)則【RouterPartitionID % BrokerPartitionNum == BrokerPartitionID % BrokerPartitionNum,RouterReplicaID = BrokerReplicaID % BrokerPartitionNum】選定目標Router Partition下某個Router replica,向其發(fā)送心跳消息,包含BrokerPartitionNum、BrokerPartitionID、BrokerHostAddr和精確到秒級的Timestamp,并異步等待所有Router replica的回復,所有Router轉(zhuǎn)發(fā)來的Gateway Message放入GatewayMessageQueue;
7)依據(jù)規(guī)則【BrokerPartitionID == RoomID % BrokerParitionNum】從Database加載數(shù)據(jù);
8)依據(jù)規(guī)則【BrokerPartitionID % BrokerParitionNum == RoomID % BrokerParitionNum】異步處理GatewayMessageQueue內(nèi)的Gateway Message,只留下合乎規(guī)則的消息的數(shù)據(jù);
9)修改Registry路徑/pubsub/broker/partition3下自身節(jié)點的狀態(tài)為Running;
10)啟動一個線程定時讀取Registry路徑/pubsub/router下各個子路徑的值,以定時輪詢的策略觀察Router各Partition的變動情況,作為實時策略的補充;定時檢查超時的Router,某Router超時后更換其所在的Partition內(nèi)其他Router替換之,定時發(fā)送心跳包;
11)當Registry路徑/pubsub/broker/partition_num的值BrokerPartitionNum發(fā)生改變的時候,依據(jù)規(guī)則【PartitionID == RoomID % PartitionNum】清洗本地路由信息緩存中每條數(shù)據(jù);
12)接收Proxy發(fā)來的Room Message,依據(jù)RoomID從路由信息緩存中查找Room有成員登陸的所有Gateway,把消息轉(zhuǎn)發(fā)給這些Gateway;
13)Broker本地存儲每個Gateway的最大GatewayMsgID,收到小于GatewayMsgID的Gateway Message可以丟棄不處理,否則更新GatewayMsgID并根據(jù)上面邏輯進行處理。
BrokerPartitionNumber可以小于或者等于或者大于RouterPartitionNumber,兩個數(shù)應該均是2的冪,兩個集群可以分別進行擴展,互不影響。譬如BrokerPartitionNumber=4而RouterPartitionNumber=2,則Broker Partition 3只需要向Router Partition 1的某個follower發(fā)送心跳消息即可;若BrokerPartitionNumber=4而RouterPartitionNumber=8,則Broker Partition 3需要向Router Partition 3的某個follower發(fā)送心跳消息的同時,還需要向Router Partition 7的某個follower發(fā)送心跳,以獲取全量的Gateway Message。
Broker需要關(guān)注/pubsub/router/partitionnum和/pubsub/broker/partitionnum的值的變化,當router或者broker進行parition水平擴展的時候,Broker需要及時重新構(gòu)建與Router之間的對應關(guān)系,及時變動發(fā)送心跳的Router Replica對象【RouterPartitionID = BrokerReplicaID % RouterPartitionNum,RouterPartitionID為Router Replica在PartitionRouterReplicaArray數(shù)組的下標】。
當Router Partition內(nèi)replica死掉或者發(fā)送心跳包的replica對象死掉(無論是注冊中心通知還是心跳包超時),broker要及時變動發(fā)送心跳的Router replica對象。
另外,Gateway使用UDP通信方式向Router發(fā)送Gateway Message,如若這個Message丟失則此Gateway上該Room內(nèi)所有成員一段時間內(nèi)(當有新的成員在當前Gateway上加入Room 時會產(chǎn)生新的Gateway Message)都無法再接收消息,為了保證消息的可靠性,可以使用這樣一個約束解決問題:在此Gateway上登錄的某Room內(nèi)的人數(shù)少于3時,Gateway會把Gateway Message復制兩份非連續(xù)(如以10ms為時間間隔)重復發(fā)送給某個Partition leader。因Gateway Message消息處理的冪等性,重復Gateway Message并不會導致Room Message發(fā)送錯誤,只在極少概率的情況下會導致Gateway收到消息的時候Room內(nèi)已經(jīng)沒有成員在此Gateway登錄,此時Gateway會把消息丟棄不作處理。
傳遞實時消息群聊消息系統(tǒng)的Broker向特定Gateway轉(zhuǎn)發(fā)Room Message的時候,會帶上Room內(nèi)在此Gateway上登錄的用戶列表,Gateway根據(jù)這個用戶列表下發(fā)消息時如果檢測到此用戶已經(jīng)下線,在放棄向此用戶轉(zhuǎn)發(fā)消息的同時,還應該把此用戶已經(jīng)下線的消息發(fā)送給Router,當Router把這個消息轉(zhuǎn)發(fā)給Broker后,Broker把此用戶從用戶列表中剔除。通過這種負反饋機制保證用戶狀態(tài)更新的及時性。
八、離線消息的處理
8.1、簡述
前期的系統(tǒng)只考慮了用戶在線情況下實時消息的傳遞,當用戶離線時其消息便無法獲取。
若系統(tǒng)考慮用戶離線消息傳遞,需要考慮如下因素:
1)消息固化:保證用戶上線時收到其離線期間的消息;
2)消息有序:離線消息和在線消息都在一個消息系統(tǒng)傳遞,給每個消息分配一個ID以區(qū)分消息先后順序,消息順序越靠后則ID愈大。
離線消息的存儲和傳輸,需要考慮用戶的狀態(tài)以及每條消息的發(fā)送狀態(tài),整個消息核心鏈路流程會有大的重構(gòu)。
新消息架構(gòu)如下圖:
系統(tǒng)名詞解釋:
1)Pi : 消息ID存儲模塊,存儲每個人未發(fā)送的消息ID有序遞增集合;
2)Xiu : 消息存儲KV模塊,存儲每個人的消息,給每個消息分配ID,以ID為key,以消息內(nèi)為value;
3)Gateway Message(HB) : 用戶登錄登出消息,包括APP保活定時心跳(Hearbeat)消息。
系統(tǒng)內(nèi)部代號貔貅(貔貅者,雄貔雌貅),源自上面兩個新模塊。
這個版本架構(gòu)流程的核心思想為“消息ID與消息內(nèi)容分離,消息與用戶狀態(tài)分離”。消息發(fā)送流程涉及到模塊 Client/Proxy/Pi/Xiu,消息推送流程則涉及到模塊 Pi/Xiu/Broker/Router/Gateway。
下面小節(jié)先細述Pi和Xiu的接口,然后再詳述發(fā)送和推送流程。
8.2、Xiu
Xiu模塊功能名稱是Message Storage,用戶緩存和固化消息,并給消息分配ID。Xiu 集群采用分 Partition 分 Replica 機制,Partition 初始數(shù)目須是2的倍數(shù),集群擴容時采用翻倍法。
8.2.1 存儲消息
存儲消息請求的參數(shù)列表為{SnowflakeID,UIN, Message},其流程如下:
1)接收客戶端發(fā)來的消息,獲取消息接收人ID(UIN)和客戶端給消息分配的 SnowflakeID;
2)檢查 UIN % Xiu_Partition_Num == Xiu_Partition_ID % Xiu_Partition_Num 添加是否成立【即接收人的消息是否應當由當前Xiu負責】,不成立則返回錯誤并退出;
3)檢查 SnowflakeID 對應的消息是否已經(jīng)被存儲過,若已經(jīng)存儲過則返回其對應的消息ID然后退出;
4)給消息分配一個 MsgID:
每個Xiu有自己唯一的 Xiu_Partition_ID,以及一個初始值為 0 的 Partition_Msg_ID。MsgID = 1B[ Xiu_Partition_ID ] + 1B[ Message Type ] + 6B[ ++ Partition_Msg_ID ]。每次分配的時候 Partition_Msg_ID 都自增加一。
5)以 MsgID 為 key 把消息存入基于共享內(nèi)存的 Hashtable,并存入消息的 CRC32 hash值和插入時間,把 MsgID 存入一個 LRU list 中:
LRU List 自身并不存入共享內(nèi)存中,當進程重啟時,可以根據(jù)Hashtable中的數(shù)據(jù)重構(gòu)出這個List。把消息存入 Hashtable 中時,如果 Hashtable full,則依據(jù) LRU List 對Hashtable 中的消息進行淘汰。
6)把MsgID返回給客戶端;
7)把MsgID異步通知給消息固化線程,消息固化線程根據(jù)MsgID從Hashtable中讀取消息并根據(jù)CRC32 hash值判斷消息內(nèi)容是否完整,完整則把消息存入本地RocksDB中。
8.2.2讀取消息
讀取消息請求的參數(shù)列表為{UIN, MsgIDList},其流程為:
1)獲取請求的 MsgIDList,判斷每個MsgID MsgID{Xiu_Partition_ID} == Xiu_Partition_ID 條件是否成立,不成立則返回錯誤并退出;
2)從 Hashtable 中獲取每個 MsgID 對應的消息;
3)如果 Hashtable 中不存在,則從 RocksDB 中讀取 MsgID 對應的消息;
4)讀取完畢則把所有獲取的消息返回給客戶端。
8.2.3主從數(shù)據(jù)同步
目前從簡,暫定Xiu的副本只有一個。
Xiu節(jié)點啟動的時候根據(jù)自身配置文件中分配的 Xiu_Partition_ID 到Registry路徑 /pubsub/xiu/partition_id 下進行注冊一個臨時有序節(jié)點,注冊成功則Registry會返回Xiu的節(jié)點 ID。
Xiu節(jié)點獲取 /pubsub/xiu/partition_id 下的所有節(jié)點的ID和地址信息,依據(jù) 節(jié)點ID最小者為leader 的原則,即可判定自己的角色。只有l(wèi)eader可接受讀寫數(shù)據(jù)請求。
數(shù)據(jù)同步流程如下:
1)follower定時向leader發(fā)送心跳信息,心跳信息包含本地最新消息的ID;
2)leader啟動一個數(shù)據(jù)同步線程處理follower的心跳信息,leader的數(shù)據(jù)同步線程從LRU list中查找 follower_latest_msg_id 之后的N條消息的ID,若獲取到則讀取消息并同步給follower,獲取不到則回復其與leader之間消息差距太大;
3)follower從leader獲取到最新一批消息,則存儲之;
4)follower若獲取leader的消息差距太大響應,則請求leader的agent把RocksDB的固化數(shù)據(jù)全量同步過來,整理完畢后再次啟動與leader之間的數(shù)據(jù)同步流程。
follower會關(guān)注Registry路徑 /pubsub/xiu/partition_id 下所有所有節(jié)點的變化情況,如果leader掛掉則及時轉(zhuǎn)換身份并接受客戶端請求。如果follower 與 leader 之間的心跳超時,則follower刪掉 leader 的 Registry 路徑節(jié)點,及時進行身份轉(zhuǎn)換處理客戶端請求。
當leader重啟或者follower轉(zhuǎn)換為leader的時候,需要把 Partition_Msg_ID 進行一個大數(shù)值增值(譬如增加1000)以防止可能的消息ID亂序情況。
8.2.4集群擴容
Xiu 集群擴容采用翻倍法,擴容時新 Partition 的節(jié)點啟動后工作流程如下:
1)向Registry的路徑 /pubsub/xiu/partition_id 下自己的 node 的 state 為 running,同時注冊自己的對外服務(wù)地址信息;
2)另外啟動一個工具,當水平擴展后所有新啟動的 Partition 內(nèi)所有 Replica 的狀態(tài)都是 Running 的時候,修改 Registry 路徑 /pubsub/xiu/partition_num 的值為擴容后 Partition 的數(shù)目。按照開頭的例子,即由2升級為4。
之所以 Xiu 不用像 Broker 和 Router 那樣啟動的時候向老的 Partition 同步數(shù)據(jù),是因為每個 Xiu 分配的 MsgID 中已經(jīng)帶有 Xiu 的 PartitionID 信息,即使集群擴容這個 ID 也不變,根據(jù)這個ID也可以定位到其所在的Partition,而不是借助 hash 方法。
8.3、Pi
Pi 模塊功能名稱是 Message ID Storage,存儲每個用戶的 MsgID List。Xiu 集群也采用分 Partition 分 Replica 機制,Partition 初始數(shù)目須是2的倍數(shù),集群擴容時采用翻倍法。
8.3.1存儲消息ID
MsgID 存儲的請求參數(shù)列表為{UIN,MsgID},Pi 工作流程如下:
1)判斷條件 UIN % Pi_Partition_Num == Pi_Partition_ID % Pi_Partition_Num 是否成立,若不成立則返回error退出;
2)把 MsgID 插入UIN的 MsgIDList 中,保持 MsgIDList 中所有 MsgID 不重復有序遞增,把請求內(nèi)容寫入本地log,給請求者返回成功響應。
Pi有專門的日志記錄線程,給每個日志操作分配一個 LogID,每個 Log 文件記錄一定量的寫操作,當文件 size 超過配置的上限后刪除之。
8.3.2讀取消息ID列表
讀取請求參數(shù)列表為{UIN, StartMsgID, MsgIDNum, ExpireFlag},其意義為獲取用戶 UIN 自起始ID為 StartMsgID 起(不包括 StartMsgID )的數(shù)目為 MsgIDNum 的消息ID列表,ExpireFlag意思是 所有小于等于 StartMsgID 的消息ID是否刪除。
流程如下:
1)判斷條件 UIN % Pi_Partition_Num == Pi_Partition_ID % Pi_Partition_Num 是否成立,若不成立則返回error退出;
2)獲取 (StartID, StartMsgID + MsgIDNum] 范圍內(nèi)的所有 MsgID,把結(jié)果返回給客戶端;
3)如果 ExpireFlag 有效,則刪除MsgIDList內(nèi)所有在 [0, StartMsgID] 范圍內(nèi)的MsgID,把請求內(nèi)容寫入本地log。
8.3.3主從數(shù)據(jù)同步
同 Xiu 模塊,暫定 Pi 的同 Parition 副本只有一個。
Pi 節(jié)點啟動的時候根據(jù)自身配置文件中分配的 Pi_Partition_ID 到Registry路徑 /pubsub/pi/partition_id 下進行注冊一個臨時有序節(jié)點,注冊成功則 Registry 會返回 Pi 的節(jié)點 ID。
Pi 節(jié)點獲取 /pubsub/pi/partition_id 下的所有節(jié)點的ID和地址信息,依據(jù) 節(jié)點ID最小者為leader 的原則,即可判定自己的角色。只有 leader 可接受讀寫數(shù)據(jù)請求。
數(shù)據(jù)同步流程如下:
1)follower 定時向 leader 發(fā)送心跳信息,心跳信息包含本地最新 LogID;
2)leader 啟動一個數(shù)據(jù)同步線程處理 follower 的心跳信息,根據(jù) follower 匯報的 logID 把此 LogID;
3)follower 從 leader 獲取到最新一批 Log,先存儲然后重放。
follower 會關(guān)注Registry路徑 /pubsub/pi/partition_id 下所有節(jié)點的變化情況,如果 leader 掛掉則及時轉(zhuǎn)換身份并接受客戶端請求。如果follower 與 leader 之間的心跳超時,則follower刪掉 leader 的 Registry 路徑節(jié)點,及時進行身份轉(zhuǎn)換處理客戶端請求。
8.3.4集群擴容
Pi 集群擴容采用翻倍法。則節(jié)點啟動后工作流程如下:
1)向 Registry 注冊,獲取 Registry 路徑 /pubsub/xiu/partition_num 的值 PartitionNumber;
2)如果發(fā)現(xiàn)自己 PartitionID 滿足條件 PartitionID >= PartitionNumber 時,則意味著當前 Partition 是擴容后的新集群,更新 Registry 中自己狀態(tài)為start;
3)讀取 Registry 路徑 /pubsub/xiu 下所有 Parition 的 leader,根據(jù)條件 自身PartitionID % PartitionNumber == PartitionID % PartitionNumber 尋找對應的老 Partition 的 leader,稱之為 parent_leader;
4)緩存收到 Proxy 轉(zhuǎn)發(fā)來的用戶請求;
5)向 parent_leader 獲取log;
6)向 parent_leader 同步內(nèi)存數(shù)據(jù);
7)重放 parent_leader 的log;
8)更新 Registry 中自己的狀態(tài)為 Running;
9)重放用戶請求;
10)當 Registry 路徑 /pubsub/xiu/partition_num 的值 PartitionNumber 滿足條件 PartitionID >= PartitionNumber 時,意味著擴容完成,處理用戶請求時要給用戶返回響應。
Proxy 會把讀寫請求參照條件 UIN % Pi\_Partition\_Num == Pi\_Partition\_ID % Pi\_Partition\_Num 向相關(guān) partition 的 leader 轉(zhuǎn)發(fā)用戶請求。假設(shè)原來 PartitionNumber 值為2,擴容后值為4,則原來轉(zhuǎn)發(fā)給 partition0 的寫請求現(xiàn)在需同時轉(zhuǎn)發(fā)給 partition0 和 partition2,原來轉(zhuǎn)發(fā)給 partition1 的寫請求現(xiàn)在需同時轉(zhuǎn)發(fā)給 partition1 和 partition3。
另外啟動一個工具,當水平擴展后所有新啟動的 Partition 內(nèi)所有 Replica 的狀態(tài)都是 Running 的時候,修改Registry路徑/pubsub/xiu/partition_num的值為擴容后 Partition 的數(shù)目。
8.4、數(shù)據(jù)發(fā)送流程
消息自 PiXiu 的外部客戶端(Client,服務(wù)端所有使用 PiXiu 提供的服務(wù)者統(tǒng)稱為客戶端)按照一定負載均衡規(guī)則發(fā)送到 Proxy,然后存入 Xiu 中,把 MsgID 存入 Pi 中。
其詳細流程如下:
1)Client 依據(jù) snowflake 算法給消息分配 SnowflakeID,依據(jù) ProxyID = UIN % ProxyNum 規(guī)則把消息發(fā)往某個 Proxy;
2)Proxy 收到消息后轉(zhuǎn)發(fā)到 Xiu;
3)Proxy 收到 Xiu 返回的響應后,把響應轉(zhuǎn)發(fā)給 Client;
4)如果 Proxy 收到 Xiu 返回的響應帶有 MsgID,則發(fā)起 Pi 寫流程,把 MsgID 同步到 Pi 中;
5)如果 Proxy 收到 Xiu 返回的響應帶有 MsgID,則給 Broker 發(fā)送一個 Notify,告知其某 UIN 的最新 MsgID。
8.5、數(shù)據(jù)轉(zhuǎn)發(fā)流程
轉(zhuǎn)發(fā)消息的主體是Broker,原來的在線消息轉(zhuǎn)發(fā)流程是它收到 Proxy 轉(zhuǎn)發(fā)來的 Message,然后根據(jù)用戶是否在線然后轉(zhuǎn)發(fā)給 Gateway。
PiXiu架構(gòu)下 Broker 會收到以下類型消息:
1)用戶登錄消息;
2)用戶心跳消息;
3)用戶登出消息;
4)Notify 消息;
5)Ack 消息。
Broker流程受這五種消息驅(qū)動,下面分別詳述其收到這五種消息時的處理流程。
用戶登錄消息流程如下:
1)檢查用戶的當前狀態(tài),若為 OffLine 則把其狀態(tài)值為在線 OnLine;
2)檢查用戶的待發(fā)送消息隊列是否為空,不為空則退出;
3)向 Pi 模塊發(fā)送獲取 N 條消息 ID 的請求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},設(shè)置用戶狀態(tài)為 GettingMsgIDList 并等待回應;
4)根據(jù) Pi 返回的消息 ID 隊列,向 Xiu 發(fā)起獲取消息請求 {UIN: uin, MsgIDList: msg ID List},設(shè)置用戶狀態(tài)為 GettingMsgList 并等待回應;
5)Xiu 返回消息列表后,設(shè)置狀態(tài)為 SendingMsg,并向 Gateway 轉(zhuǎn)發(fā)消息。
可以把用戶心跳消息當做用戶登錄消息處理。
Gateway的用戶登出消息產(chǎn)生有三種情況:
1)用戶主動退出;
2)用戶心跳超時;
3)給用戶轉(zhuǎn)發(fā)消息時發(fā)生網(wǎng)絡(luò)錯誤。
用戶登出消息處理流程如下:
1)檢查用戶狀態(tài),如果為 OffLine,則退出;
2)用戶狀態(tài)不為 OffLine 且檢查用戶已經(jīng)發(fā)送出去的消息列表的最后一條消息的 ID(LastMsgID),向 Pi 發(fā)送獲取 MsgID 請求{UIN: uin, StartMsgID: LastMsgID, MsgIDNum: 0, ExpireFlag: True},待 Pi 返回響應后退出。
處理 Proxy 發(fā)來的 Notify 消息處理流程如下:
1)如果用戶狀態(tài)為 OffLine,則退出;
2)更新用戶的最新消息 ID(LatestMsgID),如果用戶發(fā)送消息隊列不為空則退出;
3)向 Pi 模塊發(fā)送獲取 N 條消息 ID 的請求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},設(shè)置用戶狀態(tài)為 GettingMsgIDList 并等待回應;
4)根據(jù) Pi 返回的消息 ID 隊列,向 Xiu 發(fā)起獲取消息請求 {UIN: uin, MsgIDList: msg ID List},設(shè)置用戶狀態(tài)為 GettingMsgList 并等待回應;
5)Xiu 返回消息列表后,設(shè)置狀態(tài)為 SendingMsg,并向 Gateway 轉(zhuǎn)發(fā)消息。
所謂 Ack 消息,就是 Broker 經(jīng) Gateway 把消息轉(zhuǎn)發(fā)給 App 后,App 給Broker的消息回復,告知Broker其最近成功收到消息的 MsgID。
Ack 消息處理流程如下:
1)如果用戶狀態(tài)為 OffLine,則退出;
2)更新 LatestAckMsgID 的值;
3)如果用戶發(fā)送消息隊列不為空,則發(fā)送下一個消息后退出;
4)如果 LatestAckMsgID >= LatestMsgID,則退出;
5)向 Pi 模塊發(fā)送獲取 N 條消息 ID 的請求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},設(shè)置用戶狀態(tài)為 GettingMsgIDList 并等待回應;
6)根據(jù) Pi 返回的消息 ID 隊列,向 Xiu 發(fā)起獲取消息請求 {UIN: uin, MsgIDList: msg ID List},設(shè)置用戶狀態(tài)為 GettingMsgList 并等待回應;
7)Xiu 返回消息列表后,設(shè)置狀態(tài)為 SendingMsg,并向 Gateway 轉(zhuǎn)發(fā)消息。
總體上,PiXiu 轉(zhuǎn)發(fā)消息流程采用拉取(pull)轉(zhuǎn)發(fā)模型,以上面五種消息為驅(qū)動進行狀態(tài)轉(zhuǎn)換,并作出相應的動作行為。
九、本文總結(jié)
這套群聊消息系統(tǒng)尚有以下task list需完善:
1)消息以UDP鏈路傳遞,不可靠【2018/01/29解決之】;
2)目前的負載均衡算法采用了極簡的RoundRobin算法,可以根據(jù)成功率和延遲添加基于權(quán)重的負載均衡算法實現(xiàn);
3)只考慮傳遞,沒有考慮消息的去重,可以根據(jù)消息ID實現(xiàn)這個功能【2018/01/29解決之】;
4)各個模塊之間沒有考慮心跳方案,整個系統(tǒng)的穩(wěn)定性依賴于Registry【2018/01/17解決之】;
5)離線消息處理【2018/03/03解決之】;
6)區(qū)分消息優(yōu)先級。
此記。
參考文檔:《一種支持自由規(guī)劃無須數(shù)據(jù)遷移和修改路由代碼的Replicaing擴容方案》
十、本文成文歷程
于雨氏,2017/12/31,初作此文于豐臺金箱堂。
于雨氏,2018/01/16,于海淀添加“系統(tǒng)穩(wěn)定性”一節(jié)。
于雨氏,2018/01/29,于海淀添加“消息可靠性”一節(jié)。
于雨氏,2018/02/11,于海淀添加“Router”一節(jié),并重新格式化全文。
于雨氏,2018/03/05,于海淀添加“PiXiu”一節(jié)。
于雨氏,2018/03/14,于海淀添加負反饋機制、根據(jù)Gateway Message ID保證Gateway Message數(shù)據(jù)一致性 和 Gateway用戶退出消息產(chǎn)生機制 等三個細節(jié)。
于雨氏,2018/08/05,于海淀添加 “pipeline” 一節(jié)。
于雨氏,2018/08/28,于海淀添加 “大房間消息處理” 一節(jié)。
附錄:更多IM架構(gòu)設(shè)計文章
《簡述移動端IM開發(fā)的那些坑:架構(gòu)設(shè)計、通信協(xié)議和客戶端》
《一套海量在線用戶的移動端IM架構(gòu)設(shè)計實踐分享(含詳細圖文)》
《一套原創(chuàng)分布式即時通訊(IM)系統(tǒng)理論架構(gòu)方案》
《從零到卓越:京東客服即時通訊系統(tǒng)的技術(shù)架構(gòu)演進歷程》
《蘑菇街即時通訊/IM服務(wù)器開發(fā)之架構(gòu)選擇》
《騰訊QQ1.4億在線用戶的技術(shù)挑戰(zhàn)和架構(gòu)演進之路PPT》
《微信后臺基于時間序的海量數(shù)據(jù)冷熱分級架構(gòu)設(shè)計實踐》
《微信技術(shù)總監(jiān)談架構(gòu):微信之道——大道至簡(演講全文)》
《如何解讀《微信技術(shù)總監(jiān)談架構(gòu):微信之道——大道至簡》》
《快速裂變:見證微信強大后臺架構(gòu)從0到1的演進歷程(一)》
《17年的實踐:騰訊海量產(chǎn)品的技術(shù)方法論》
《移動端IM中大規(guī)模群消息的推送如何保證效率、實時性?》
《現(xiàn)代IM系統(tǒng)中聊天消息的同步和存儲方案探討》
《IM開發(fā)基礎(chǔ)知識補課(二):如何設(shè)計大量圖片文件的服務(wù)端存儲架構(gòu)?》
《IM開發(fā)基礎(chǔ)知識補課(三):快速理解服務(wù)端數(shù)據(jù)庫讀寫分離原理及實踐建議》
《IM開發(fā)基礎(chǔ)知識補課(四):正確理解HTTP短連接中的Cookie、Session和Token》
《WhatsApp技術(shù)實踐分享:32人工程團隊創(chuàng)造的技術(shù)神話》
《微信朋友圈千億訪問量背后的技術(shù)挑戰(zhàn)和實踐總結(jié)》
《王者榮耀2億用戶量的背后:產(chǎn)品定位、技術(shù)架構(gòu)、網(wǎng)絡(luò)方案等》
《IM系統(tǒng)的MQ消息中間件選型:Kafka還是RabbitMQ?》
《騰訊資深架構(gòu)師干貨總結(jié):一文讀懂大型分布式系統(tǒng)設(shè)計的方方面面》
《以微博類應用場景為例,總結(jié)海量社交系統(tǒng)的架構(gòu)設(shè)計步驟》
《快速理解高性能HTTP服務(wù)端的負載均衡技術(shù)原理》
《子彈短信光鮮的背后:網(wǎng)易云信首席架構(gòu)師分享億級IM平臺的技術(shù)實踐》
《知乎技術(shù)分享:從單機到2000萬QPS并發(fā)的Redis高性能緩存實踐之路》
《IM開發(fā)基礎(chǔ)知識補課(五):通俗易懂,正確理解并用好MQ消息隊列》
《微信技術(shù)分享:微信的海量IM聊天消息序列號生成實踐(算法原理篇)》
《微信技術(shù)分享:微信的海量IM聊天消息序列號生成實踐(容災方案篇)》
《新手入門:零基礎(chǔ)理解大型分布式架構(gòu)的演進歷史、技術(shù)原理、最佳實踐》
《一套高可用、易伸縮、高并發(fā)的IM群聊架構(gòu)方案設(shè)計實踐》
>> 更多同類文章 ……
(本文同步發(fā)布于:http://www.52im.net/thread-2015-1-1.html)
作者:Jack Jiang (點擊作者姓名進入Github)
出處:http://www.52im.net/space-uid-1.html
交流:歡迎加入即時通訊開發(fā)交流群 215891622
討論:http://www.52im.net/
Jack Jiang同時是【原創(chuàng)Java
Swing外觀工程BeautyEye】和【輕量級移動端即時通訊框架MobileIMSDK】的作者,可前往下載交流。
本博文
歡迎轉(zhuǎn)載,轉(zhuǎn)載請注明出處(也可前往 我的52im.net 找到我)。