MQTT協(xié)議筆記之頭部信息
前言
MQTT(Message Queue Telemetry Transport),遙測(cè)傳輸協(xié)議,提供訂閱/發(fā)布模式,更為簡(jiǎn)約、輕量,易于使用,針對(duì)受限環(huán)境(帶寬低、網(wǎng)絡(luò)延遲高、網(wǎng)絡(luò)通信不穩(wěn)定),可以簡(jiǎn)單概括為物聯(lián)網(wǎng)打造,官方總結(jié)特點(diǎn)如下:
1.使用發(fā)布/訂閱消息模式,提供一對(duì)多的消息發(fā)布,解除應(yīng)用程序耦合。
2. 對(duì)負(fù)載內(nèi)容屏蔽的消息傳輸。
3. 使用 TCP/IP 提供網(wǎng)絡(luò)連接。
4. 有三種消息發(fā)布服務(wù)質(zhì)量:
“至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會(huì)發(fā)生消息丟失或重復(fù)。這一級(jí)別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送。
“至少一次”,確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生。
“只有一次”,確保消息到達(dá)一次。這一級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會(huì)導(dǎo)致不正確的結(jié)果。
5. 小型傳輸,開銷很小(固定長(zhǎng)度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量。
6. 使用 Last Will 和 Testament 特性通知有關(guān)各方客戶端異常中斷的機(jī)制。
MQTT 3.1協(xié)議在線版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
官方下載地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf
PDF版本,42頁(yè),不算多。
另外,目前MQTT大家都用在了手機(jī)推送,可能還有很多的使用方式,有待進(jìn)一步的探索。
協(xié)議方面,以前曾簡(jiǎn)單實(shí)現(xiàn)過一點(diǎn)HTTP協(xié)議,基于HTTP上構(gòu)建若干種通信管道的socket.io協(xié)議,不過socket.io 0.9版本的協(xié)議才兩三頁(yè)而已。面對(duì)領(lǐng)域不同,自然解決的方式也不一樣。
閱讀完畢MQTT協(xié)議,有一個(gè)想法,其實(shí)可以基于MQTT協(xié)議,打造更加私有、精簡(jiǎn)(協(xié)議一些地方,略顯多余)的傳輸協(xié)議,比如一個(gè)字節(jié)的傳輸開銷。有時(shí)間,會(huì)詳細(xì)說一下。
固定頭部
固定頭部,使用兩個(gè)字節(jié),共16位:
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Type | DUP flag | QoS level | RETAIN | ||||
byte 2 | Remaining Length |
第一個(gè)字節(jié)(byte 1)
消息類型(4-7),使用4位二進(jìn)制表示,可代表16種消息類型:
Mnemonic | Enumeration | Description |
---|---|---|
Reserved | 0 | Reserved |
CONNECT | 1 | Client request to connect to Server |
CONNACK | 2 | Connect Acknowledgment |
PUBLISH | 3 | Publish message |
PUBACK | 4 | Publish Acknowledgment |
PUBREC | 5 | Publish Received (assured delivery part 1) |
PUBREL | 6 | Publish Release (assured delivery part 2) |
PUBCOMP | 7 | Publish Complete (assured delivery part 3) |
SUBSCRIBE | 8 | Client Subscribe request |
SUBACK | 9 | Subscribe Acknowledgment |
UNSUBSCRIBE | 10 | Client Unsubscribe request |
UNSUBACK | 11 | Unsubscribe Acknowledgment |
PINGREQ | 12 | PING Request |
PINGRESP | 13 | PING Response |
DISCONNECT | 14 | Client is Disconnecting |
Reserved | 15 | Reserved |
除去0和15位置屬于保留待用,共14種消息事件類型。
DUP flag(打開標(biāo)志)
保證消息可靠傳輸,默認(rèn)為0,只占用一個(gè)字節(jié),表示第一次發(fā)送。不能用于檢測(cè)消息重復(fù)發(fā)送等。只適用于客戶端或服務(wù)器端嘗試重發(fā)PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要滿足以下條件:
當(dāng)QoS > 0
消息需要回復(fù)確認(rèn)
此時(shí),在可變頭部需要包含消息ID。當(dāng)值為1時(shí),表示當(dāng)前消息先前已經(jīng)被傳送過。
QoS(Quality of Service,服務(wù)質(zhì)量)
使用兩個(gè)二進(jìn)制表示PUBLISH類型消息:
QoS value | bit 2 | bit 1 | Description | ||
---|---|---|---|---|---|
0 | 0 | 0 | 至多一次 | 發(fā)完即丟棄 | <=1 |
1 | 0 | 1 | 至少一次 | 需要確認(rèn)回復(fù) | >=1 |
2 | 1 | 0 | 只有一次 | 需要確認(rèn)回復(fù) | =1 |
3 | 1 | 1 | 待用,保留位置 |
RETAIN(保持)
僅針對(duì)PUBLISH消息。不同值,不同含義:
1:表示發(fā)送的消息需要一直持久保存(不受服務(wù)器重啟影響),不但要發(fā)送給當(dāng)前的訂閱者,并且以后新來的訂閱了此Topic name的訂閱者會(huì)馬上得到推送。
備注:新來乍到的訂閱者,只會(huì)取出最新的一個(gè)RETAIN flag = 1的消息推送。
0:僅僅為當(dāng)前訂閱者推送此消息。
假如服務(wù)器收到一個(gè)空消息體(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服務(wù)器可以刪除掉對(duì)應(yīng)的已被持久化的PUBLISH消息。
如何解析
因?yàn)閖ava使用有符號(hào)(最高位為符號(hào)位)數(shù)據(jù)表示,byte范圍:-128-127。該字節(jié)的最高位(左邊第一位),可能為1。若直接轉(zhuǎn)換為byte類型,會(huì)出現(xiàn)負(fù)數(shù),這是一個(gè)雷區(qū)。DataInputStream提供了int readUnsignedByte()讀取方式,請(qǐng)注意。下面演示了,如何從一個(gè)字節(jié)中,獲取到所有定義的信息,同時(shí)繞過雷區(qū):
public static void main(String[] args) {
byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0
doGetBit(publishFixHeader);
int ori = 224;//1110000,DISCONNECT ,Message Type (14)
byte flag = (byte) ori; //有符號(hào)byte
doGetBit(flag);
doGetBit_v2(ori);
}
public static void doGetBit(byte flags) {
boolean retain = (flags & 1) > 0;
int qosLevel = (flags & 0x06) >> 1;
boolean dupFlag = (flags & 8) > 0;
int messageType = (flags >> 4) & 0x0f;
System.out.format(
"Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
messageType, dupFlag, qosLevel, retain);
}
public static void doGetBit_v2(int flags) {
boolean retain = (flags & 1) > 0;
int qosLevel = (flags & 0x06) >> 1;
boolean dupFlag = (flags & 8) > 0;
int messageType = flags >> 4;
System.out.format(
"Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
messageType, dupFlag, qosLevel, retain);
}
處理Remaining Length(剩余長(zhǎng)度)
在當(dāng)前消息中剩余的byte(字節(jié))數(shù),包含可變頭部和負(fù)荷(稱之為內(nèi)容/body,更為合適)。單個(gè)字節(jié)最大值:01111111,16進(jìn)制:0x7F,10進(jìn)制為127。單個(gè)字節(jié)為什么不能是11111111(0xFF)呢?因?yàn)镸QTT協(xié)議規(guī)定,第八位(最高位)若為1,則表示還有后續(xù)字節(jié)存在。同時(shí)MQTT協(xié)議最多允許4個(gè)字節(jié)表示剩余長(zhǎng)度。那么最大長(zhǎng)度為:0xFF,0xFF,0xFF,0x7F,二進(jìn)制表示為:11111111,11111111,11111111,01111111,十進(jìn)制:268435455 byte=261120KB=256MB=0.25GB 四個(gè)字節(jié)之間值的范圍:
Digits | From | To |
---|---|---|
1 | 0 (0x00) | 127 (0x7F) |
2 | 128 (0x80, 0x01) | 16 383 (0xFF, 0x7F) |
3 | 16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F) |
4 | 2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
如何換算成十進(jìn)制呢 ? 使用java語(yǔ)言表示如下:
public static void main(String[] args) throws IOException {
// 模擬客戶端寫入
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
dataOutputStream.write(0xff);
dataOutputStream.write(0xff);
dataOutputStream.write(0xff);
dataOutputStream.write(0x7f);
InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());
// 模擬服務(wù)器/客戶端解析
System. out.println( "result is " + bytes2Length(arrayInputStream));
}
/**
* 轉(zhuǎn)化字節(jié)為 int類型長(zhǎng)度
* @param in
* @return
* @throws IOException
*/
private static int bytes2Length(InputStream in) throws IOException {
int multiplier = 1;
int length = 0;
int digit = 0;
do {
digit = in.read(); //一個(gè)字節(jié)的有符號(hào)或者無符號(hào),轉(zhuǎn)換轉(zhuǎn)換為四個(gè)字節(jié)有符號(hào) int類型
length += (digit & 0x7f) * multiplier;
multiplier *= 128;
} while ((digit & 0x80) != 0);
return length;
}
一般最后一個(gè)字節(jié)小于127(01111111),和0x80(10000000)進(jìn)行&操作,最終結(jié)果都為0,因此計(jì)算會(huì)終止。代理中間件和請(qǐng)求者,中間傳遞的是字節(jié)流Stream,自然要從流中讀取,逐一解析出來。
那么如何將int類型長(zhǎng)度解析為不確定的字節(jié)值呢?
public static void main(String[] args) throws IOException {
// 模擬服務(wù)器/客戶端寫入
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(
arrayOutputStream);
// 模擬服務(wù)器/客戶端解析
length2Bytes(dataOutputStream, 128);
}
/**
* int類型長(zhǎng)度解析為1-4個(gè)字節(jié)
* @param out
* @param length
* @throws IOException
*/
private static void length2Bytes(OutputStream out, int length)
throws IOException {
int val = length;
do {
int digit = val % 128;
val = val / 128;
if (val > 0)
digit = digit | 0x80;
out.write(digit);
} while (val > 0);
}
digit對(duì)val求模,最大值可能是127,一旦127 | 10000000 = 11111111 = 0xff = 255 請(qǐng)注意:剩余長(zhǎng)度,只在固定頭部中,無論是一個(gè)字節(jié),還是四個(gè)字節(jié),不能被算作可變頭部中。
可變頭部
固定頭部?jī)H定義了消息類型和一些標(biāo)志位,一些消息的元數(shù)據(jù),需要放入可變頭部中。可變頭部?jī)?nèi)容字節(jié)長(zhǎng)度 + Playload/負(fù)荷字節(jié)長(zhǎng)度 = 剩余長(zhǎng)度,這個(gè)是需要牢記的。可變頭部,包含了協(xié)議名稱,版本號(hào),連接標(biāo)志,用戶授權(quán),心跳時(shí)間等內(nèi)容,這部分和后面要講到的CONNECT消息類型,有重復(fù),暫時(shí)略過。
Playload/消息體/負(fù)荷
消息體主要是為配合固定/可變頭部命令(比如CONNECT可變頭部User name標(biāo)記若為1則需要在消息體中附加用戶名稱字符串)而存在。
CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息體。PUBLISH的消息體以二進(jìn)制形式對(duì)待。
請(qǐng)記住,MQTT協(xié)議只允許在PUBLISH類型消息體中使用自定義特性,在固定/可變頭部想加入自定義私有特性,就免了吧。這也是為了協(xié)議免于流于形式,變得很分裂也為了兼顧現(xiàn)有客戶端等。比如支持壓縮等,那就可以在Playload中定義數(shù)據(jù)支持,在應(yīng)用中進(jìn)行讀取處理。
這部分會(huì)在后面詳細(xì)論述。
消息標(biāo)識(shí)符/消息ID
固定頭中的QoS level標(biāo)志值為1或2時(shí)才會(huì)在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可變頭中出現(xiàn)。
一個(gè)16位無符號(hào)位的short類型值(值不能為 0,0做保留作為無效的消息ID),僅僅要求在一個(gè)特定方向(服務(wù)器發(fā)往客戶端為一個(gè)方向,客戶端發(fā)送到服務(wù)器端為另一個(gè)方向)的通信消息中必須唯一。比如客戶端發(fā)往服務(wù)器,有可能存在服務(wù)器發(fā)往客戶端會(huì)同時(shí)存在重復(fù),但不礙事。
可變頭部中,需要兩個(gè)字節(jié)的順序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻譯成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左邊/上面,表示這是一個(gè)大端字節(jié)/網(wǎng)絡(luò)字節(jié)序,符合人的閱讀習(xí)慣,高位在最左邊。
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
Message Identifier MSB | ||||||||
Message Identifier LSB |
但凡如此表示的,都可以視為一個(gè)16位無符號(hào)short類型整數(shù),兩個(gè)字節(jié)表示。在JAVA中處理比較簡(jiǎn)單:
DataInputStream.readUnsignedShort
或者
in.read() * 0xFF + in.read();
最大長(zhǎng)度可為: 65535
UTF-8編碼
有關(guān)字符串,MQTT采用的是修改版的UTF-8編碼,一般形式為如下,需要牢記:
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | String Length MSB | |||||||
byte 2 | String Length LSB | |||||||
bytes 3 ... | Encoded Character Data |
比如AVA,使用writeUTF()方法寫入一串文字“OTWP”,頭兩個(gè)字節(jié)為一個(gè)完整的無符號(hào)數(shù)字,代表字符串字節(jié)長(zhǎng)度,后面四個(gè)字節(jié)才是字符串真正的長(zhǎng)度,共六個(gè)字節(jié):
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | Message Length MSB (0x00) | |||||||
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
byte 2 | Message Length LSB (0x04) | |||||||
0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 | |
byte 3 | 'O' (0x4F) | |||||||
0 | 1 | 0 | 0 | 1 | 1 | 1 | 1 | |
byte 4 | 'T' (0x54) | |||||||
0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 | |
byte 5 | 'W' (0x57) | |||||||
0 | 1 | 0 | 1 | 0 | 1 | 1 | 1 | |
byte 6 | 'P' (0x50) | |||||||
0 | 1 | 0 | 1 | 0 | 0 | 0 | 0 |
這點(diǎn),在程序中,可不用單獨(dú)處理默認(rèn),直接使用readUTF()方法,可自動(dòng)省去了處理字符串長(zhǎng)度的麻煩。當(dāng)然,可以手動(dòng)讀取字符串:
// 模擬寫入
dataOutputStream.writeUTF( "abcd");// 2 + 4 = 6 byte
......
// 模擬讀取
int decodedLength = dataInputStream.readUnsignedShort();//2 byte
byte[] decodedString = new byte[decodedLength]; // 4 bytes
dataInputStream.read(decodedString);
String target = new String(decodedString, "UTF-8");
等同于:
String target = dataInputStream.readUTF();
MQTT無論是可變頭部還是消息體中,只要是字符串部分,都是采用了修改版的UTF-8編碼,讀取和寫入,借助DataInputStream/DataOutputStream的幫助,一行語(yǔ)句,略去了手動(dòng)處理的麻煩。
小結(jié)
總之,掌握固定頭部的QoS level、RETAIN標(biāo)記、可變頭部的Connect flags作用和意義,對(duì)總體理解MQTT作用很大。
posted on 2014-02-07 17:35 nieyong 閱讀(39593) 評(píng)論(2) 編輯 收藏 所屬分類: MQTT