MQTT協(xié)議筆記之消息流
前言
前面的筆記已把所有消息類型都過了一遍,這里從消息流的角度嘗試解讀一下。
網(wǎng)絡(luò)故障
在任何網(wǎng)絡(luò)環(huán)境下,都會(huì)出現(xiàn)一方連接失敗,比如離開公司大門那一刻沒有了WIFI信號(hào)。但持續(xù)連接的另一端-服務(wù)器可能不能立即知道對(duì)方已斷開。類似網(wǎng)絡(luò)異常情況,都有可能在消息發(fā)送的過程中出現(xiàn),消息發(fā)送出去,就丟失了。
MQTT協(xié)議假定客戶端和服務(wù)器端穩(wěn)定情況一般,彼此之通信管道不可靠,一旦客戶端網(wǎng)絡(luò)斷開,情況就會(huì)很嚴(yán)重,很難恢復(fù)原狀。
但別忘記,很多客戶端會(huì)有永久性存儲(chǔ)設(shè)備支持,比如閃存ROM、存儲(chǔ)卡等,在通信出現(xiàn)異常的情況下可以用于保存關(guān)鍵數(shù)據(jù)或狀態(tài)信息等。
總之,異常網(wǎng)絡(luò)情況很復(fù)雜,只能小心處理之。
消息重發(fā)策略
在QoS > 0情況下,PUBLISH、PUBREL、SUBSCRIBE、UNSUBSCRIBE等類型消息在發(fā)送者發(fā)送完之后,需要等待一個(gè)響應(yīng)消息,若在一個(gè)指定時(shí)間段內(nèi)沒有收到,發(fā)送者可能需要重試。重發(fā)的消息,要求DUP標(biāo)記要設(shè)置為1.
等待響應(yīng)的超時(shí)應(yīng)該在消息成功發(fā)送之后開始算起,并且等待超時(shí)應(yīng)該是可以配置選項(xiàng),以便在下一次重試的時(shí)候,適當(dāng)加大。比如第一次重試超時(shí)10秒,下一次可能為20秒,再一次重試可能為60秒呢。當(dāng)然,還要有一個(gè)重試次數(shù)限制的。
還有一種情況,客戶端重新連接,但未在可變頭部中設(shè)置clean session標(biāo)記,但雙方(客戶端和服務(wù)器端)都應(yīng)該重試先前未發(fā)送的動(dòng)態(tài)消息(in-flight messages)??蛻舳瞬槐粡?qiáng)制要求發(fā)送未被確認(rèn)的消息,但服務(wù)器端就得需要重發(fā)那些未被去確認(rèn)的消息。
QoS level決定的消息流
QoS level為Quality of Service level的縮寫,翻譯成中文,服務(wù)質(zhì)量等級(jí)。
MQTT 3.1協(xié)議在"4.1 Quality of Service levels and flows"章節(jié)中,僅僅討論了客戶端到服務(wù)器的發(fā)布流程,不太完整。因?yàn)闆Q定消息到達(dá)率,能夠提升發(fā)送質(zhì)量的,應(yīng)該是服務(wù)器發(fā)布PUBLISH消息到訂閱者這一消息流方向。
QoS level 0
至多發(fā)送一次,發(fā)送即丟棄。沒有確認(rèn)消息,也不知道對(duì)方是否收到。
Client | Message and direction | Server |
---|---|---|
QoS = 0 | PUBLISH ----------> | Action: Publish message to subscribers then Forget Reception: <=1 |
針對(duì)的消息不重要,丟失也無所謂。
網(wǎng)絡(luò)層面,傳輸壓力小。
QoS level 1
所有QoS level 1都要在可變頭部中附加一個(gè)16位的消息ID。
SUBSCRIBE和UNSUBSCRIBE消息使用QoS level 1。
針對(duì)消息的發(fā)布,Qos level 1,意味著消息至少被傳輸一次。
發(fā)送者若在一段時(shí)間內(nèi)接收不到PUBACK消息,發(fā)送者需要打開DUB標(biāo)記為1,然后重新發(fā)送PUBLISH消息。因此會(huì)導(dǎo)致接收方可能會(huì)收到兩次PUBLISH消息。針對(duì)客戶端發(fā)布消息到服務(wù)器的消息流:
Client | Message and direction | Server |
---|---|---|
QoS = 1 DUP = 0 Message ID = x Action: Store message | PUBLISH ----------> | Actions:
Reception: >=1 |
Action: Discard message | PUBACK <---------- | Message ID = x |
針對(duì)服務(wù)器發(fā)布到訂閱者的消息流:
Server | Message and direction | Subscriber |
---|---|---|
QoS = 1 DUP = 0 Message ID = x | PUBLISH ----------> | Actions:
Reception: >=1 |
PUBACK <---------- | Message ID = x |
發(fā)布者(客戶端/服務(wù)器)若因種種異常接收不到PUBACK消息,會(huì)再次重新發(fā)送PUBLISH消息,同時(shí)設(shè)置DUP標(biāo)記為1。接收者以服務(wù)器為例,這可能會(huì)導(dǎo)致服務(wù)器收到重復(fù)消息,按照流程,broker(服務(wù)器)發(fā)布消息到訂閱者(會(huì)導(dǎo)致訂閱者接收到重復(fù)消息),然后發(fā)送一條PUBACK確認(rèn)消息到發(fā)布者。
在業(yè)務(wù)層面,或許可以彌補(bǔ)MQTT協(xié)議的不足之處:重試的消息ID一定要一致接收方一定判斷當(dāng)前接收的消息ID是否已經(jīng)接受過
但一樣不能夠完全確保,消息一定到達(dá)了。
QoS level 2
僅僅在PUBLISH類型消息中出現(xiàn),要求在可變頭部中要附加消息ID。
級(jí)別高,通信壓力稍大些,但確保了僅僅傳輸接收一次。
先看協(xié)議中流程圖,Client -> Server方向,會(huì)有一個(gè)總體印象:
Client | Message and direction | Server |
---|---|---|
QoS = 2 DUP = 0 Message ID = x Action: Store message | PUBLISH ----------> | Action(a) Store message or Actions(b):
|
PUBREC <---------- | Message ID = x | |
Message ID = x | PUBREL ----------> | Actions(a):
or Action(b): Delete message ID |
Action: Discard message | PUBCOMP <---------- | Message ID = x |
Server -> Subscriber:
Server | Message and direction | Subscriber |
---|---|---|
QoS = 2 DUP = 0 Message ID = x | PUBLISH ----------> | Action: Store message |
PUBREC <---------- | Message ID = x | |
Message ID = x | PUBREL ----------> | Actions:
|
PUBCOMP <---------- | Message ID = x |
Server端采取的方案a和b,都包含了何時(shí)消息有效,何時(shí)處理消息。兩個(gè)方案二選一,Server端自己決定。但無論死采取哪一種方式,都是在QoS level 2協(xié)議范疇下,不受影響。若一方?jīng)]有接收到對(duì)應(yīng)的確認(rèn)消息,會(huì)從最近一次需要確認(rèn)的消息重試,以便整個(gè)(QoS level 2)流程打通。
消息順序
消息順序會(huì)受許多因素的影響,但對(duì)于服務(wù)器程序,必須保證消息傳遞流程的每個(gè)階段要和開始的順序一致。例如,在QoS level 2定義的消息流中,PUBREL流必須和PUBLISH流具有相同的順序發(fā)送:
Client | Message and direction | Server |
---|---|---|
PUBLISH 1----------> PUBLISH 2 ----------> PUBLISH 3 ----------> | ||
PUBREC 1<---------- PUBREC 2 <---------- | ||
PUBREL 1----------> | ||
PUBREC 3<---------- | ||
PUBREL 2----------> | ||
PUBCOMP 1<---------- | ||
PUBREL 3----------> | ||
PUBCOMP 2<---------- PUBCOMP 3 <---------- |
流動(dòng)消息(in-flight messages)數(shù)量允許有一個(gè)可保證的效果:
- 在流動(dòng)消息(in-flight)窗口1中,每個(gè)傳遞流在下一個(gè)流開始之前完成。這保證消息以提交的順序傳遞
- 在流動(dòng)消息(in-flight)大于1的窗口,只能在QoS level內(nèi)被保證消息的順序
消息的持久化
在MQTT協(xié)議中,PUBLISH消息固定頭部RETAIN標(biāo)記,只有為1才要求服務(wù)器需要持久保存此消息,除非新的PUBLISH覆蓋。
對(duì)于持久的、最新一條PUBLISH消息,服務(wù)器不但要發(fā)送給當(dāng)前的訂閱者,并且新的訂閱者(new subscriber,同樣需要訂閱了此消息對(duì)應(yīng)的Topic name)會(huì)馬上得到推送。
Tip:新來乍到的訂閱者,只會(huì)取出最新的一個(gè)RETAIN flag = 1的消息推送,不是所有。
消息流的編碼/解碼
MQTT協(xié)議中,由目前定義的14種類型消息在客戶端和服務(wù)器端之間數(shù)據(jù)進(jìn)行交互。若以JAVA語言構(gòu)建MQTT服務(wù)器,可選擇Netty作為基礎(chǔ)。
在Netty中,數(shù)據(jù)的進(jìn)入和流出,代表了一次完整的交互。無論是要進(jìn)入的還是要流出的數(shù)據(jù)(單獨(dú)以服務(wù)器為例),都可看做字節(jié)流。若把每種類型消息抽象為一個(gè)具體對(duì)象,那么處理起來就不難了。
客戶端->服務(wù)器,進(jìn)入的字節(jié)流,逐個(gè)字節(jié)/單位讀取,可還原成一個(gè)具體的消息對(duì)象(解碼的過程)。
要發(fā)送到客戶端的消息對(duì)象,轉(zhuǎn)換(編碼)成字節(jié)流,然后由TCP通道流轉(zhuǎn)到接收者。
小結(jié)
斷斷續(xù)續(xù)記錄了MQTT 3.1協(xié)議的若干閱讀筆記,總之是把協(xié)議個(gè)人認(rèn)為不夠清晰,或者我不好理解的地方,著重進(jìn)行了分析。也便于自己以后回過來頭來翻閱,不是那么快的忘卻。
posted on 2014-02-15 19:17 nieyong 閱讀(41526) 評(píng)論(9) 編輯 收藏 所屬分類: MQTT