聶永的博客

          記錄工作/學(xué)習(xí)的點(diǎn)點(diǎn)滴滴。

          MQTT協(xié)議筆記之頭部信息

          前言

          MQTT(Message Queue Telemetry Transport),遙測(cè)傳輸協(xié)議,提供訂閱/發(fā)布模式,更為簡(jiǎn)約、輕量,易于使用,針對(duì)受限環(huán)境(帶寬低、網(wǎng)絡(luò)延遲高、網(wǎng)絡(luò)通信不穩(wěn)定),可以簡(jiǎn)單概括為物聯(lián)網(wǎng)打造,官方總結(jié)特點(diǎn)如下:

          1.使用發(fā)布/訂閱消息模式,提供一對(duì)多的消息發(fā)布,解除應(yīng)用程序耦合。
          2. 對(duì)負(fù)載內(nèi)容屏蔽的消息傳輸。
          3. 使用 TCP/IP 提供網(wǎng)絡(luò)連接。
          4. 有三種消息發(fā)布服務(wù)質(zhì)量:
              “至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會(huì)發(fā)生消息丟失或重復(fù)。這一級(jí)別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送。
              “至少一次”,確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生。
              “只有一次”,確保消息到達(dá)一次。這一級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會(huì)導(dǎo)致不正確的結(jié)果。
          5. 小型傳輸,開銷很小(固定長(zhǎng)度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量。
          6. 使用 Last Will 和 Testament 特性通知有關(guān)各方客戶端異常中斷的機(jī)制。
          

          MQTT 3.1協(xié)議在線版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html

          官方下載地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf

          PDF版本,42頁(yè),不算多。

          另外,目前MQTT大家都用在了手機(jī)推送,可能還有很多的使用方式,有待進(jìn)一步的探索。

          協(xié)議方面,以前曾簡(jiǎn)單實(shí)現(xiàn)過一點(diǎn)HTTP協(xié)議,基于HTTP上構(gòu)建若干種通信管道的socket.io協(xié)議,不過socket.io 0.9版本的協(xié)議才兩三頁(yè)而已。面對(duì)領(lǐng)域不同,自然解決的方式也不一樣。

          閱讀完畢MQTT協(xié)議,有一個(gè)想法,其實(shí)可以基于MQTT協(xié)議,打造更加私有、精簡(jiǎn)(協(xié)議一些地方,略顯多余)的傳輸協(xié)議,比如一個(gè)字節(jié)的傳輸開銷。有時(shí)間,會(huì)詳細(xì)說一下。

          固定頭部

          固定頭部,使用兩個(gè)字節(jié),共16位:

          bit 7 6 5 4 3 2 1 0
          byte 1 Message Type DUP flag QoS level RETAIN
          byte 2 Remaining Length

          第一個(gè)字節(jié)(byte 1)

          消息類型(4-7),使用4位二進(jìn)制表示,可代表16種消息類型:

          Mnemonic Enumeration Description
          Reserved 0 Reserved
          CONNECT 1 Client request to connect to Server
          CONNACK 2 Connect Acknowledgment
          PUBLISH 3 Publish message
          PUBACK 4 Publish Acknowledgment
          PUBREC 5 Publish Received (assured delivery part 1)
          PUBREL 6 Publish Release (assured delivery part 2)
          PUBCOMP 7 Publish Complete (assured delivery part 3)
          SUBSCRIBE 8 Client Subscribe request
          SUBACK 9 Subscribe Acknowledgment
          UNSUBSCRIBE 10 Client Unsubscribe request
          UNSUBACK 11 Unsubscribe Acknowledgment
          PINGREQ 12 PING Request
          PINGRESP 13 PING Response
          DISCONNECT 14 Client is Disconnecting
          Reserved 15 Reserved

          除去0和15位置屬于保留待用,共14種消息事件類型。

          DUP flag(打開標(biāo)志)

          保證消息可靠傳輸,默認(rèn)為0,只占用一個(gè)字節(jié),表示第一次發(fā)送。不能用于檢測(cè)消息重復(fù)發(fā)送等。只適用于客戶端或服務(wù)器端嘗試重發(fā)PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要滿足以下條件:

           當(dāng)QoS > 0
           消息需要回復(fù)確認(rèn)
          

          此時(shí),在可變頭部需要包含消息ID。當(dāng)值為1時(shí),表示當(dāng)前消息先前已經(jīng)被傳送過。

          QoS(Quality of Service,服務(wù)質(zhì)量)

          使用兩個(gè)二進(jìn)制表示PUBLISH類型消息:

          QoS value bit 2 bit 1 Description
          0 0 0 至多一次 發(fā)完即丟棄 <=1
          1 0 1 至少一次 需要確認(rèn)回復(fù) >=1
          2 1 0 只有一次 需要確認(rèn)回復(fù) =1
          3 1 1 待用,保留位置

          RETAIN(保持)

          僅針對(duì)PUBLISH消息。不同值,不同含義:

          1:表示發(fā)送的消息需要一直持久保存(不受服務(wù)器重啟影響),不但要發(fā)送給當(dāng)前的訂閱者,并且以后新來的訂閱了此Topic name的訂閱者會(huì)馬上得到推送。

          備注:新來乍到的訂閱者,只會(huì)取出最新的一個(gè)RETAIN flag = 1的消息推送。

          0:僅僅為當(dāng)前訂閱者推送此消息。

          假如服務(wù)器收到一個(gè)空消息體(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服務(wù)器可以刪除掉對(duì)應(yīng)的已被持久化的PUBLISH消息。

          如何解析

          因?yàn)閖ava使用有符號(hào)(最高位為符號(hào)位)數(shù)據(jù)表示,byte范圍:-128-127。該字節(jié)的最高位(左邊第一位),可能為1。若直接轉(zhuǎn)換為byte類型,會(huì)出現(xiàn)負(fù)數(shù),這是一個(gè)雷區(qū)。DataInputStream提供了int readUnsignedByte()讀取方式,請(qǐng)注意。下面演示了,如何從一個(gè)字節(jié)中,獲取到所有定義的信息,同時(shí)繞過雷區(qū):

          public static void main(String[] args) {
              byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0
          
              doGetBit(publishFixHeader);
              int ori = 224;//1110000,DISCONNECT ,Message Type (14)
              byte flag = (byte) ori; //有符號(hào)byte       
              doGetBit(flag);
              doGetBit_v2(ori);
          }
          
          
          public static void doGetBit(byte flags) {
              boolean retain = (flags & 1) > 0;
              int qosLevel = (flags & 0x06) >> 1;
              boolean dupFlag = (flags & 8) > 0;
              int messageType = (flags >> 4) & 0x0f;
          
              System.out.format(
                      "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
                      messageType, dupFlag, qosLevel, retain);
          }
          
          public static void doGetBit_v2(int flags) {
              boolean retain = (flags & 1) > 0;
              int qosLevel = (flags & 0x06) >> 1;
              boolean dupFlag = (flags & 8) > 0;
              int messageType = flags >> 4;
          
              System.out.format(
                      "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
                      messageType, dupFlag, qosLevel, retain);
          }
          

          處理Remaining Length(剩余長(zhǎng)度)

          在當(dāng)前消息中剩余的byte(字節(jié))數(shù),包含可變頭部和負(fù)荷(稱之為內(nèi)容/body,更為合適)。單個(gè)字節(jié)最大值:01111111,16進(jìn)制:0x7F,10進(jìn)制為127。單個(gè)字節(jié)為什么不能是11111111(0xFF)呢?因?yàn)镸QTT協(xié)議規(guī)定,第八位(最高位)若為1,則表示還有后續(xù)字節(jié)存在。同時(shí)MQTT協(xié)議最多允許4個(gè)字節(jié)表示剩余長(zhǎng)度。那么最大長(zhǎng)度為:0xFF,0xFF,0xFF,0x7F,二進(jìn)制表示為:11111111,11111111,11111111,01111111,十進(jìn)制:268435455 byte=261120KB=256MB=0.25GB 四個(gè)字節(jié)之間值的范圍:

          Digits From To
          1 0 (0x00) 127 (0x7F)
          2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
          3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
          4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

          如何換算成十進(jìn)制呢 ? 使用java語(yǔ)言表示如下:

          public static void main(String[] args) throws IOException {
              // 模擬客戶端寫入
             ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
             DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
             dataOutputStream.write(0xff);
             dataOutputStream.write(0xff);
             dataOutputStream.write(0xff);
             dataOutputStream.write(0x7f);
          
             InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());
          
              // 模擬服務(wù)器/客戶端解析
             System. out.println( "result is " + bytes2Length(arrayInputStream));
          }
          
          /**
          * 轉(zhuǎn)化字節(jié)為 int類型長(zhǎng)度
          * @param in
          * @return
          * @throws IOException
          */
          private static int bytes2Length(InputStream in) throws IOException {
              int multiplier = 1;
              int length = 0;
              int digit = 0;
              do {
                  digit = in.read(); //一個(gè)字節(jié)的有符號(hào)或者無符號(hào),轉(zhuǎn)換轉(zhuǎn)換為四個(gè)字節(jié)有符號(hào) int類型
                  length += (digit & 0x7f) * multiplier;
                  multiplier *= 128;
             } while ((digit & 0x80) != 0);
          
              return length;
          }
          

          一般最后一個(gè)字節(jié)小于127(01111111),和0x80(10000000)進(jìn)行&操作,最終結(jié)果都為0,因此計(jì)算會(huì)終止。代理中間件和請(qǐng)求者,中間傳遞的是字節(jié)流Stream,自然要從流中讀取,逐一解析出來。

          那么如何將int類型長(zhǎng)度解析為不確定的字節(jié)值呢?

          public static void main(String[] args) throws IOException {
              // 模擬服務(wù)器/客戶端寫入
             ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
             DataOutputStream dataOutputStream = new DataOutputStream(
                       arrayOutputStream);
          
              // 模擬服務(wù)器/客戶端解析
              length2Bytes(dataOutputStream, 128);
          }
          
          /**
          * int類型長(zhǎng)度解析為1-4個(gè)字節(jié)
          * @param out
          * @param length
          * @throws IOException
          */
          private static void length2Bytes(OutputStream out, int length)
                   throws IOException {
              int val = length;
              do {
                   int digit = val % 128;
                  val = val / 128;
                   if (val > 0)
                       digit = digit | 0x80;
          
                  out.write(digit);
             } while (val > 0);
          }
          

          digit對(duì)val求模,最大值可能是127,一旦127 | 10000000 = 11111111 = 0xff = 255 請(qǐng)注意:剩余長(zhǎng)度,只在固定頭部中,無論是一個(gè)字節(jié),還是四個(gè)字節(jié),不能被算作可變頭部中。

          可變頭部

          固定頭部?jī)H定義了消息類型和一些標(biāo)志位,一些消息的元數(shù)據(jù),需要放入可變頭部中。可變頭部?jī)?nèi)容字節(jié)長(zhǎng)度 + Playload/負(fù)荷字節(jié)長(zhǎng)度 = 剩余長(zhǎng)度,這個(gè)是需要牢記的。可變頭部,包含了協(xié)議名稱,版本號(hào),連接標(biāo)志,用戶授權(quán),心跳時(shí)間等內(nèi)容,這部分和后面要講到的CONNECT消息類型,有重復(fù),暫時(shí)略過。

          Playload/消息體/負(fù)荷

          消息體主要是為配合固定/可變頭部命令(比如CONNECT可變頭部User name標(biāo)記若為1則需要在消息體中附加用戶名稱字符串)而存在。

          CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息體。PUBLISH的消息體以二進(jìn)制形式對(duì)待。

          請(qǐng)記住,MQTT協(xié)議只允許在PUBLISH類型消息體中使用自定義特性,在固定/可變頭部想加入自定義私有特性,就免了吧。這也是為了協(xié)議免于流于形式,變得很分裂也為了兼顧現(xiàn)有客戶端等。比如支持壓縮等,那就可以在Playload中定義數(shù)據(jù)支持,在應(yīng)用中進(jìn)行讀取處理。

          這部分會(huì)在后面詳細(xì)論述。

          消息標(biāo)識(shí)符/消息ID

          固定頭中的QoS level標(biāo)志值為1或2時(shí)才會(huì)在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可變頭中出現(xiàn)。

          一個(gè)16位無符號(hào)位的short類型值(值不能為 0,0做保留作為無效的消息ID),僅僅要求在一個(gè)特定方向(服務(wù)器發(fā)往客戶端為一個(gè)方向,客戶端發(fā)送到服務(wù)器端為另一個(gè)方向)的通信消息中必須唯一。比如客戶端發(fā)往服務(wù)器,有可能存在服務(wù)器發(fā)往客戶端會(huì)同時(shí)存在重復(fù),但不礙事。

          可變頭部中,需要兩個(gè)字節(jié)的順序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻譯成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左邊/上面,表示這是一個(gè)大端字節(jié)/網(wǎng)絡(luò)字節(jié)序,符合人的閱讀習(xí)慣,高位在最左邊。

          bit 7 6 5 4 3 2 1 0
            Message Identifier MSB
            Message Identifier LSB

          但凡如此表示的,都可以視為一個(gè)16位無符號(hào)short類型整數(shù),兩個(gè)字節(jié)表示。在JAVA中處理比較簡(jiǎn)單:

          DataInputStream.readUnsignedShort
          

          或者

          in.read() * 0xFF + in.read();
          

          最大長(zhǎng)度可為: 65535

          UTF-8編碼

          有關(guān)字符串,MQTT采用的是修改版的UTF-8編碼,一般形式為如下,需要牢記:

          bit 7 6 5 4 3 2 1 0
          byte 1 String Length MSB
          byte 2 String Length LSB
          bytes 3 ... Encoded Character Data

          比如AVA,使用writeUTF()方法寫入一串文字“OTWP”,頭兩個(gè)字節(jié)為一個(gè)完整的無符號(hào)數(shù)字,代表字符串字節(jié)長(zhǎng)度,后面四個(gè)字節(jié)才是字符串真正的長(zhǎng)度,共六個(gè)字節(jié):

          bit 7 6 5 4 3 2 1 0
          byte 1 Message Length MSB (0x00)
            0 0 0 0 0 0 0 0
          byte 2 Message Length LSB (0x04)
            0 0 0 0 0 1 0 0
          byte 3 'O' (0x4F)
            0 1 0 0 1 1 1 1
          byte 4 'T' (0x54)
            0 1 0 1 0 1 0 0
          byte 5 'W' (0x57)
            0 1 0 1 0 1 1 1
          byte 6 'P' (0x50)
            0 1 0 1 0 0 0 0

          這點(diǎn),在程序中,可不用單獨(dú)處理默認(rèn),直接使用readUTF()方法,可自動(dòng)省去了處理字符串長(zhǎng)度的麻煩。當(dāng)然,可以手動(dòng)讀取字符串:

          // 模擬寫入
          dataOutputStream.writeUTF( "abcd");// 2 + 4 = 6 byte
          ......
          // 模擬讀取 
          int decodedLength = dataInputStream.readUnsignedShort();//2 byte
          byte[] decodedString = new byte[decodedLength]; // 4 bytes
          dataInputStream.read(decodedString);
          String target = new String(decodedString, "UTF-8");
          

          等同于:

          String target = dataInputStream.readUTF();
          

          MQTT無論是可變頭部還是消息體中,只要是字符串部分,都是采用了修改版的UTF-8編碼,讀取和寫入,借助DataInputStream/DataOutputStream的幫助,一行語(yǔ)句,略去了手動(dòng)處理的麻煩。

          小結(jié)

          總之,掌握固定頭部的QoS level、RETAIN標(biāo)記、可變頭部的Connect flags作用和意義,對(duì)總體理解MQTT作用很大。

          posted on 2014-02-07 17:35 nieyong 閱讀(39593) 評(píng)論(2)  編輯  收藏 所屬分類: MQTT

          評(píng)論

          # re: MQTT協(xié)議筆記之頭部信息 2014-11-17 20:59 金敏通

          可以的  回復(fù)  更多評(píng)論   

          # re: MQTT協(xié)議筆記之頭部信息 2014-12-26 15:37 光輝

          【一個(gè)16位無符號(hào)位的short類型值(值不能為 0,0做保留作為無效的消息ID),僅僅要求在一個(gè)特定方向(服務(wù)器發(fā)往客戶端為一個(gè)方向,客戶端發(fā)送到服務(wù)器端為另一個(gè)方向)的通信消息中必須唯一。比如客戶端發(fā)往服務(wù)器,有可能存在服務(wù)器發(fā)往客戶端會(huì)同時(shí)存在重復(fù),但不礙事。】
          ===========================
          剛才看了一下Paho.MQTT.Client(WebSocket)中發(fā)送messae時(shí)的messageIdentifier的實(shí)現(xiàn),這個(gè)id當(dāng)qos大于0時(shí),
          并不是唯一的,它只是確保在已發(fā)送但未收到PUBACK 的in flight message中是唯一的,例如,當(dāng)發(fā)送了
          消息A時(shí)使用id(3),并且收到A的PUBACK了,那么下次發(fā)送的message的id仍然是3。另外,當(dāng)Id到一個(gè)最大
          值時(shí),這個(gè)Id會(huì)被reset
          /* The largest message identifier allowed, may not be larger than 2**16 but
          * if set smaller reduces the maximum number of outbound messages allowed.
          */
          ClientImpl.prototype.maxMessageIdentifier = 65536;

          if (this._message_identifier === this.maxMessageIdentifier) {
          this._message_identifier = 1;
          }  回復(fù)  更多評(píng)論   

          公告

          所有文章皆為原創(chuàng),若轉(zhuǎn)載請(qǐng)標(biāo)明出處,謝謝~

          新浪微博,歡迎關(guān)注:

          導(dǎo)航

          <2014年2月>
          2627282930311
          2345678
          9101112131415
          16171819202122
          2324252627281
          2345678

          統(tǒng)計(jì)

          常用鏈接

          留言簿(58)

          隨筆分類(130)

          隨筆檔案(151)

          個(gè)人收藏

          最新隨筆

          搜索

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          主站蜘蛛池模板: 桃园县| 襄汾县| 田阳县| 泰来县| 茶陵县| 阜宁县| 和田县| 镇雄县| 陈巴尔虎旗| 扎鲁特旗| 昂仁县| 左云县| 蓝山县| 商南县| 义乌市| 吴川市| 文昌市| 宣汉县| 韩城市| 梁河县| 岐山县| 隆子县| 绿春县| 隆回县| 井冈山市| 漳平市| 阜平县| 昭苏县| 仁化县| 友谊县| 普兰县| 胶州市| 观塘区| 澄迈县| 大港区| 台前县| 阿克苏市| 平乐县| 孟村| 常州市| 布拖县|