聶永的博客

          記錄工作/學(xué)習(xí)的點(diǎn)點(diǎn)滴滴。

          MQTT協(xié)議筆記之頭部信息

          前言

          MQTT(Message Queue Telemetry Transport),遙測傳輸協(xié)議,提供訂閱/發(fā)布模式,更為簡約、輕量,易于使用,針對受限環(huán)境(帶寬低、網(wǎng)絡(luò)延遲高、網(wǎng)絡(luò)通信不穩(wěn)定),可以簡單概括為物聯(lián)網(wǎng)打造,官方總結(jié)特點(diǎn)如下:

          1.使用發(fā)布/訂閱消息模式,提供一對多的消息發(fā)布,解除應(yīng)用程序耦合。
          2. 對負(fù)載內(nèi)容屏蔽的消息傳輸。
          3. 使用 TCP/IP 提供網(wǎng)絡(luò)連接。
          4. 有三種消息發(fā)布服務(wù)質(zhì)量:
              “至多一次”,消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會發(fā)生消息丟失或重復(fù)。這一級別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無所謂,因為不久后還會有第二次發(fā)送。
              “至少一次”,確保消息到達(dá),但消息重復(fù)可能會發(fā)生。
              “只有一次”,確保消息到達(dá)一次。這一級別可用于如下情況,在計費(fèi)系統(tǒng)中,消息重復(fù)或丟失會導(dǎo)致不正確的結(jié)果。
          5. 小型傳輸,開銷很小(固定長度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量。
          6. 使用 Last Will 和 Testament 特性通知有關(guān)各方客戶端異常中斷的機(jī)制。
          

          MQTT 3.1協(xié)議在線版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html

          官方下載地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf

          PDF版本,42頁,不算多。

          另外,目前MQTT大家都用在了手機(jī)推送,可能還有很多的使用方式,有待進(jìn)一步的探索。

          協(xié)議方面,以前曾簡單實(shí)現(xiàn)過一點(diǎn)HTTP協(xié)議,基于HTTP上構(gòu)建若干種通信管道的socket.io協(xié)議,不過socket.io 0.9版本的協(xié)議才兩三頁而已。面對領(lǐng)域不同,自然解決的方式也不一樣。

          閱讀完畢MQTT協(xié)議,有一個想法,其實(shí)可以基于MQTT協(xié)議,打造更加私有、精簡(協(xié)議一些地方,略顯多余)的傳輸協(xié)議,比如一個字節(jié)的傳輸開銷。有時間,會詳細(xì)說一下。

          固定頭部

          固定頭部,使用兩個字節(jié),共16位:

          bit 7 6 5 4 3 2 1 0
          byte 1 Message Type DUP flag QoS level RETAIN
          byte 2 Remaining Length

          第一個字節(jié)(byte 1)

          消息類型(4-7),使用4位二進(jìn)制表示,可代表16種消息類型:

          Mnemonic Enumeration Description
          Reserved 0 Reserved
          CONNECT 1 Client request to connect to Server
          CONNACK 2 Connect Acknowledgment
          PUBLISH 3 Publish message
          PUBACK 4 Publish Acknowledgment
          PUBREC 5 Publish Received (assured delivery part 1)
          PUBREL 6 Publish Release (assured delivery part 2)
          PUBCOMP 7 Publish Complete (assured delivery part 3)
          SUBSCRIBE 8 Client Subscribe request
          SUBACK 9 Subscribe Acknowledgment
          UNSUBSCRIBE 10 Client Unsubscribe request
          UNSUBACK 11 Unsubscribe Acknowledgment
          PINGREQ 12 PING Request
          PINGRESP 13 PING Response
          DISCONNECT 14 Client is Disconnecting
          Reserved 15 Reserved

          除去0和15位置屬于保留待用,共14種消息事件類型。

          DUP flag(打開標(biāo)志)

          保證消息可靠傳輸,默認(rèn)為0,只占用一個字節(jié),表示第一次發(fā)送。不能用于檢測消息重復(fù)發(fā)送等。只適用于客戶端或服務(wù)器端嘗試重發(fā)PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要滿足以下條件:

           當(dāng)QoS > 0
           消息需要回復(fù)確認(rèn)
          

          此時,在可變頭部需要包含消息ID。當(dāng)值為1時,表示當(dāng)前消息先前已經(jīng)被傳送過。

          QoS(Quality of Service,服務(wù)質(zhì)量)

          使用兩個二進(jìn)制表示PUBLISH類型消息:

          QoS value bit 2 bit 1 Description
          0 0 0 至多一次 發(fā)完即丟棄 <=1
          1 0 1 至少一次 需要確認(rèn)回復(fù) >=1
          2 1 0 只有一次 需要確認(rèn)回復(fù) =1
          3 1 1 待用,保留位置

          RETAIN(保持)

          僅針對PUBLISH消息。不同值,不同含義:

          1:表示發(fā)送的消息需要一直持久保存(不受服務(wù)器重啟影響),不但要發(fā)送給當(dāng)前的訂閱者,并且以后新來的訂閱了此Topic name的訂閱者會馬上得到推送。

          備注:新來乍到的訂閱者,只會取出最新的一個RETAIN flag = 1的消息推送。

          0:僅僅為當(dāng)前訂閱者推送此消息。

          假如服務(wù)器收到一個空消息體(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服務(wù)器可以刪除掉對應(yīng)的已被持久化的PUBLISH消息。

          如何解析

          因為java使用有符號(最高位為符號位)數(shù)據(jù)表示,byte范圍:-128-127。該字節(jié)的最高位(左邊第一位),可能為1。若直接轉(zhuǎn)換為byte類型,會出現(xiàn)負(fù)數(shù),這是一個雷區(qū)。DataInputStream提供了int readUnsignedByte()讀取方式,請注意。下面演示了,如何從一個字節(jié)中,獲取到所有定義的信息,同時繞過雷區(qū):

          public static void main(String[] args) {
              byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0
          
              doGetBit(publishFixHeader);
              int ori = 224;//1110000,DISCONNECT ,Message Type (14)
              byte flag = (byte) ori; //有符號byte       
              doGetBit(flag);
              doGetBit_v2(ori);
          }
          
          
          public static void doGetBit(byte flags) {
              boolean retain = (flags & 1) > 0;
              int qosLevel = (flags & 0x06) >> 1;
              boolean dupFlag = (flags & 8) > 0;
              int messageType = (flags >> 4) & 0x0f;
          
              System.out.format(
                      "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
                      messageType, dupFlag, qosLevel, retain);
          }
          
          public static void doGetBit_v2(int flags) {
              boolean retain = (flags & 1) > 0;
              int qosLevel = (flags & 0x06) >> 1;
              boolean dupFlag = (flags & 8) > 0;
              int messageType = flags >> 4;
          
              System.out.format(
                      "Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
                      messageType, dupFlag, qosLevel, retain);
          }
          

          處理Remaining Length(剩余長度)

          在當(dāng)前消息中剩余的byte(字節(jié))數(shù),包含可變頭部和負(fù)荷(稱之為內(nèi)容/body,更為合適)。單個字節(jié)最大值:01111111,16進(jìn)制:0x7F,10進(jìn)制為127。單個字節(jié)為什么不能是11111111(0xFF)呢?因為MQTT協(xié)議規(guī)定,第八位(最高位)若為1,則表示還有后續(xù)字節(jié)存在。同時MQTT協(xié)議最多允許4個字節(jié)表示剩余長度。那么最大長度為:0xFF,0xFF,0xFF,0x7F,二進(jìn)制表示為:11111111,11111111,11111111,01111111,十進(jìn)制:268435455 byte=261120KB=256MB=0.25GB 四個字節(jié)之間值的范圍:

          Digits From To
          1 0 (0x00) 127 (0x7F)
          2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
          3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
          4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

          如何換算成十進(jìn)制呢 ? 使用java語言表示如下:

          public static void main(String[] args) throws IOException {
              // 模擬客戶端寫入
             ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
             DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
             dataOutputStream.write(0xff);
             dataOutputStream.write(0xff);
             dataOutputStream.write(0xff);
             dataOutputStream.write(0x7f);
          
             InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());
          
              // 模擬服務(wù)器/客戶端解析
             System. out.println( "result is " + bytes2Length(arrayInputStream));
          }
          
          /**
          * 轉(zhuǎn)化字節(jié)為 int類型長度
          * @param in
          * @return
          * @throws IOException
          */
          private static int bytes2Length(InputStream in) throws IOException {
              int multiplier = 1;
              int length = 0;
              int digit = 0;
              do {
                  digit = in.read(); //一個字節(jié)的有符號或者無符號,轉(zhuǎn)換轉(zhuǎn)換為四個字節(jié)有符號 int類型
                  length += (digit & 0x7f) * multiplier;
                  multiplier *= 128;
             } while ((digit & 0x80) != 0);
          
              return length;
          }
          

          一般最后一個字節(jié)小于127(01111111),和0x80(10000000)進(jìn)行&操作,最終結(jié)果都為0,因此計算會終止。代理中間件和請求者,中間傳遞的是字節(jié)流Stream,自然要從流中讀取,逐一解析出來。

          那么如何將int類型長度解析為不確定的字節(jié)值呢?

          public static void main(String[] args) throws IOException {
              // 模擬服務(wù)器/客戶端寫入
             ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
             DataOutputStream dataOutputStream = new DataOutputStream(
                       arrayOutputStream);
          
              // 模擬服務(wù)器/客戶端解析
              length2Bytes(dataOutputStream, 128);
          }
          
          /**
          * int類型長度解析為1-4個字節(jié)
          * @param out
          * @param length
          * @throws IOException
          */
          private static void length2Bytes(OutputStream out, int length)
                   throws IOException {
              int val = length;
              do {
                   int digit = val % 128;
                  val = val / 128;
                   if (val > 0)
                       digit = digit | 0x80;
          
                  out.write(digit);
             } while (val > 0);
          }
          

          digit對val求模,最大值可能是127,一旦127 | 10000000 = 11111111 = 0xff = 255 請注意:剩余長度,只在固定頭部中,無論是一個字節(jié),還是四個字節(jié),不能被算作可變頭部中。

          可變頭部

          固定頭部僅定義了消息類型和一些標(biāo)志位,一些消息的元數(shù)據(jù),需要放入可變頭部中。可變頭部內(nèi)容字節(jié)長度 + Playload/負(fù)荷字節(jié)長度 = 剩余長度,這個是需要牢記的。可變頭部,包含了協(xié)議名稱,版本號,連接標(biāo)志,用戶授權(quán),心跳時間等內(nèi)容,這部分和后面要講到的CONNECT消息類型,有重復(fù),暫時略過。

          Playload/消息體/負(fù)荷

          消息體主要是為配合固定/可變頭部命令(比如CONNECT可變頭部User name標(biāo)記若為1則需要在消息體中附加用戶名稱字符串)而存在。

          CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息體。PUBLISH的消息體以二進(jìn)制形式對待。

          請記住,MQTT協(xié)議只允許在PUBLISH類型消息體中使用自定義特性,在固定/可變頭部想加入自定義私有特性,就免了吧。這也是為了協(xié)議免于流于形式,變得很分裂也為了兼顧現(xiàn)有客戶端等。比如支持壓縮等,那就可以在Playload中定義數(shù)據(jù)支持,在應(yīng)用中進(jìn)行讀取處理。

          這部分會在后面詳細(xì)論述。

          消息標(biāo)識符/消息ID

          固定頭中的QoS level標(biāo)志值為1或2時才會在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可變頭中出現(xiàn)。

          一個16位無符號位的short類型值(值不能為 0,0做保留作為無效的消息ID),僅僅要求在一個特定方向(服務(wù)器發(fā)往客戶端為一個方向,客戶端發(fā)送到服務(wù)器端為另一個方向)的通信消息中必須唯一。比如客戶端發(fā)往服務(wù)器,有可能存在服務(wù)器發(fā)往客戶端會同時存在重復(fù),但不礙事。

          可變頭部中,需要兩個字節(jié)的順序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻譯成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左邊/上面,表示這是一個大端字節(jié)/網(wǎng)絡(luò)字節(jié)序,符合人的閱讀習(xí)慣,高位在最左邊。

          bit 7 6 5 4 3 2 1 0
            Message Identifier MSB
            Message Identifier LSB

          但凡如此表示的,都可以視為一個16位無符號short類型整數(shù),兩個字節(jié)表示。在JAVA中處理比較簡單:

          DataInputStream.readUnsignedShort
          

          或者

          in.read() * 0xFF + in.read();
          

          最大長度可為: 65535

          UTF-8編碼

          有關(guān)字符串,MQTT采用的是修改版的UTF-8編碼,一般形式為如下,需要牢記:

          bit 7 6 5 4 3 2 1 0
          byte 1 String Length MSB
          byte 2 String Length LSB
          bytes 3 ... Encoded Character Data

          比如AVA,使用writeUTF()方法寫入一串文字“OTWP”,頭兩個字節(jié)為一個完整的無符號數(shù)字,代表字符串字節(jié)長度,后面四個字節(jié)才是字符串真正的長度,共六個字節(jié):

          bit 7 6 5 4 3 2 1 0
          byte 1 Message Length MSB (0x00)
            0 0 0 0 0 0 0 0
          byte 2 Message Length LSB (0x04)
            0 0 0 0 0 1 0 0
          byte 3 'O' (0x4F)
            0 1 0 0 1 1 1 1
          byte 4 'T' (0x54)
            0 1 0 1 0 1 0 0
          byte 5 'W' (0x57)
            0 1 0 1 0 1 1 1
          byte 6 'P' (0x50)
            0 1 0 1 0 0 0 0

          這點(diǎn),在程序中,可不用單獨(dú)處理默認(rèn),直接使用readUTF()方法,可自動省去了處理字符串長度的麻煩。當(dāng)然,可以手動讀取字符串:

          // 模擬寫入
          dataOutputStream.writeUTF( "abcd");// 2 + 4 = 6 byte
          ......
          // 模擬讀取 
          int decodedLength = dataInputStream.readUnsignedShort();//2 byte
          byte[] decodedString = new byte[decodedLength]; // 4 bytes
          dataInputStream.read(decodedString);
          String target = new String(decodedString, "UTF-8");
          

          等同于:

          String target = dataInputStream.readUTF();
          

          MQTT無論是可變頭部還是消息體中,只要是字符串部分,都是采用了修改版的UTF-8編碼,讀取和寫入,借助DataInputStream/DataOutputStream的幫助,一行語句,略去了手動處理的麻煩。

          小結(jié)

          總之,掌握固定頭部的QoS level、RETAIN標(biāo)記、可變頭部的Connect flags作用和意義,對總體理解MQTT作用很大。

          posted on 2014-02-07 17:35 nieyong 閱讀(39600) 評論(2)  編輯  收藏 所屬分類: MQTT

          評論

          # re: MQTT協(xié)議筆記之頭部信息 2014-11-17 20:59 金敏通

          可以的  回復(fù)  更多評論   

          # re: MQTT協(xié)議筆記之頭部信息 2014-12-26 15:37 光輝

          【一個16位無符號位的short類型值(值不能為 0,0做保留作為無效的消息ID),僅僅要求在一個特定方向(服務(wù)器發(fā)往客戶端為一個方向,客戶端發(fā)送到服務(wù)器端為另一個方向)的通信消息中必須唯一。比如客戶端發(fā)往服務(wù)器,有可能存在服務(wù)器發(fā)往客戶端會同時存在重復(fù),但不礙事。】
          ===========================
          剛才看了一下Paho.MQTT.Client(WebSocket)中發(fā)送messae時的messageIdentifier的實(shí)現(xiàn),這個id當(dāng)qos大于0時,
          并不是唯一的,它只是確保在已發(fā)送但未收到PUBACK 的in flight message中是唯一的,例如,當(dāng)發(fā)送了
          消息A時使用id(3),并且收到A的PUBACK了,那么下次發(fā)送的message的id仍然是3。另外,當(dāng)Id到一個最大
          值時,這個Id會被reset
          /* The largest message identifier allowed, may not be larger than 2**16 but
          * if set smaller reduces the maximum number of outbound messages allowed.
          */
          ClientImpl.prototype.maxMessageIdentifier = 65536;

          if (this._message_identifier === this.maxMessageIdentifier) {
          this._message_identifier = 1;
          }  回復(fù)  更多評論   

          公告

          所有文章皆為原創(chuàng),若轉(zhuǎn)載請標(biāo)明出處,謝謝~

          新浪微博,歡迎關(guān)注:

          導(dǎo)航

          <2014年2月>
          2627282930311
          2345678
          9101112131415
          16171819202122
          2324252627281
          2345678

          統(tǒng)計

          常用鏈接

          留言簿(58)

          隨筆分類(130)

          隨筆檔案(151)

          個人收藏

          最新隨筆

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 海伦市| 徐水县| 志丹县| 壤塘县| 神农架林区| 商水县| 定兴县| 盈江县| 潞西市| 眉山市| 宝清县| 诸城市| 永吉县| 朝阳市| 固阳县| 灵宝市| 邳州市| 宝坻区| 灯塔市| 沾益县| 福州市| 兴宁市| 安徽省| 肃北| 怀集县| 山东| 洪泽县| 周至县| 莱西市| 那坡县| 同仁县| 新民市| 乐陵市| 资源县| 万山特区| 和田市| 大安市| 彰武县| 壶关县| 饶河县| 景谷|