I want to fly higher
          programming Explorer
          posts - 114,comments - 263,trackbacks - 0
          <2013年12月>
          24252627282930
          1234567
          891011121314
          15161718192021
          22232425262728
          2930311234

          常用鏈接

          留言簿(5)

          隨筆分類(161)

          隨筆檔案(114)

          文章分類(2)

          文章檔案(2)

          Alibaba

          Comprehensive

          Expert

          Game

          Java

          搜索

          •  

          積分與排名

          • 積分 - 599952
          • 排名 - 78

          最新評(píng)論

          閱讀排行榜

           1.CumulativeProtocolDecoder
                A {@link ProtocolDecoder} that cumulates the content of received buffers to a cumulative buffer to help users implement decoders.If the received {@link IoBuffer} is only a part of a message.decoders should cumulate received buffers to make a message complete or to postpone decoding until more buffers arrive.
               即解決'
          粘包'->即一次接收數(shù)據(jù)不能完全體現(xiàn)一個(gè)完整的消息數(shù)據(jù)->通過(guò)應(yīng)用層數(shù)據(jù)協(xié)議,如協(xié)議中通過(guò)4字節(jié)描述消息大小或以結(jié)束符.

          2.CumulativeProtocolDecoder#decode實(shí)現(xiàn)
          /**
              * 1.緩存decode中的IoBuffer in至session的attribute
              * 2.循環(huán)調(diào)用doDecode方法直到其返回false
              * 3.解碼結(jié)束后緩存的buffer->壓縮
             
          */

             
          public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
                
          // 判斷傳輸層是否存在消息分片,如果不分片則直接doDecode.(可參考TCP/IP詳解)
                  if (!session.getTransportMetadata().hasFragmentation()) {
                     
          while (in.hasRemaining()) {
                         
          if (!doDecode(session, in, out)) {
                             
          break;
                          }

                      }


                     
          return;
                  }


                 
          boolean usingSessionBuffer = true;
                  IoBuffer buf
          = (IoBuffer) session.getAttribute(BUFFER);
                 
          // 如果session中有BUFFER這個(gè)attribute則直接執(zhí)行追加,否則直接用網(wǎng)絡(luò)層讀到的buffer
                  if (buf != null) {
                     
          boolean appended = false;
                     
          // Make sure that the buffer is auto-expanded.
                      if (buf.isAutoExpand()) {
                         
          try {
                              buf.put(in);
                              appended
          = true;
                          }
          catch (IllegalStateException e) {
                             
          // 可能調(diào)用了類似slice的方法,會(huì)使父緩沖區(qū)的自動(dòng)擴(kuò)展屬性失效(1.可參考AbstractIoBuffer#recapacityAllowed 2.可參考IoBuffer的實(shí)現(xiàn))
                          }
          catch (IndexOutOfBoundsException e) {
                             
          // 取消了自動(dòng)擴(kuò)展屬性(可參考IoBuffer實(shí)現(xiàn))
                          }

                      }


                     
          if (appended) {
             
          // 追加成功的話,直接flip
                          buf.flip();
                      }
          else {
              
          // 因?yàn)橛昧伺缮姆椒?父子緩沖區(qū))如slice或取消了自動(dòng)擴(kuò)展而導(dǎo)致追加失敗->重新分配一個(gè)Buffer
                          buf.flip();
                          IoBuffer newBuf
          = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
                          newBuf.order(buf.order());
                          newBuf.put(buf);
                          newBuf.put(in);
                          newBuf.flip();
                          buf
          = newBuf;

                         
          // 更新session屬性
                          session.setAttribute(BUFFER, buf);
                      }

                  }
          else {
             
          // 此else表示session無(wú)BUFFER屬性,直接賦值
                      buf = in;
                      usingSessionBuffer
          = false;
                  }


                 
          // 無(wú)限循環(huán)直到break 1.doDecode返回false 2.doDecode返回true且buf已無(wú)數(shù)據(jù) 3.異常
                  for (;;) {
                     
          int oldPos = buf.position();
                     
          boolean decoded = doDecode(session, buf, out);
                     
          if (decoded) {
                         
          if (buf.position() == oldPos) {
                             
          throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
                          }


                         
          if (!buf.hasRemaining()) {
                             
          break;
                          }

                      }
          else {
                         
          break;
                      }

                  }


                 
          // 如果經(jīng)過(guò)decode,buffer依然有剩余數(shù)據(jù)則存儲(chǔ)到session->這樣下次decode的時(shí)候就可以從session取出buffer并執(zhí)行追加了
                  if (buf.hasRemaining()) {
                     
          if (usingSessionBuffer && buf.isAutoExpand()) {
                
          // 壓縮
                          buf.compact();
                      }
          else {
                          storeRemainingInSession(buf, session);
                      }

                  }
          else {
                     
          if (usingSessionBuffer) {
                          removeSessionBuffer(session);
                      }

                  }

              }

                      注.
                              1.doDecode在消息非完整的時(shí)候返回false. 
                              2.如果doDecode成功解碼出一條完整消息則返回true->如果此時(shí)buffer中依然有剩余數(shù)據(jù)則繼續(xù)執(zhí)行for->doDecode->直到buffer中的數(shù)據(jù)不足以解碼出一條成功消息返回false.或者恰恰有n條完整的消息->從for跳出.

          3.CumulativeProtocolDecoder example
              /**
                * 解碼以CRLF(回車換行)作為結(jié)束符的消息
                       
          */

             
          public class CrLfTerminatedCommandLineDecoder
                   
          extends CumulativeProtocolDecoder {

              
          private Command parseCommand(IoBuffer in) {
             
          // 實(shí)現(xiàn)將二進(jìn)制byte[]轉(zhuǎn)為業(yè)務(wù)邏輯消息對(duì)象Command
                }


            
          // 只需實(shí)現(xiàn)doDecode方法即可
              protected boolean doDecode(
                       IoSession session, IoBuffer in, ProtocolDecoderOutput out)
                       
          throws Exception {

                 
          // 初始位置
                    int start = in.position();

                   
          // 查找'\r\n'標(biāo)記
                    byte previous = 0;
                  
          while (in.hasRemaining()) {
                      
          byte current = in.get();

                       
          // 找到了\r\n
                        if (previous == '\r' && current == '\n') {
                          
          // Remember the current position and limit.
                            int position = in.position();
                         
          int limit = in.limit();
                         
          try {
                                in.position(start);
                              in.limit(position);
          //設(shè)置當(dāng)前的位置為limit

                    
          // position和limit之間是一個(gè)完整的CRLF消息
                              out.write(parseCommand(in.slice()));//調(diào)用slice方法獲得positon和limit之間的子緩沖區(qū)->調(diào)用write方法加入消息隊(duì)列(因?yàn)榫W(wǎng)絡(luò)層一個(gè)包可能有多個(gè)完整消息)->后經(jīng)調(diào)用flush(遍歷消息隊(duì)列的消息)->nextFilter.messageReceived
          filter
                            }
          finally {
                              
          // 設(shè)置position為解碼后的position.limit設(shè)置為舊的limit
                               in.position(position);
                              in.limit(limit);
                            }


             
          // 直接返回true.因?yàn)樵诟割惖膁ecode方法中doDecode是循環(huán)執(zhí)行的直到不再有完整的消息返回false.
                         return true;
                     }


                      previous
          = current;
                   }


                   
          // 沒(méi)有找到\r\n,則重置position并返回false.使得父類decode->for跳出break.
                    in.position(start);

                   
          return false;
                }

            }

           4.
          DemuxingProtocolDecoder
              
           1.public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder
               2.這是一個(gè)復(fù)合的decoder->多路復(fù)用->找到一個(gè)合適的MessageDecoder.(不同的消息協(xié)議)

               3.其doDecode實(shí)現(xiàn)為迭代候選的MessageDecoder列表->調(diào)用MessageDecoder#decodable方法->如果解碼結(jié)果為MessageDecoderResult#NOT_OK,則從候選列表移除;如果解碼結(jié)果為MessageDecoderResult#NEED_DATA,則保留該候選decoder并在更多數(shù)據(jù)到達(dá)的時(shí)候會(huì)再次調(diào)用decodable;如果返回結(jié)果為MessageDecoderResult#OK,則表明找到了正確的decoder;如果沒(méi)有剩下任何的候選decoder,則拋出異常.

              4.doDecode源碼
                protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
                       
          // 從Session中獲取一個(gè)State.State包含一個(gè)MessageDecoder數(shù)組以及一個(gè)當(dāng)前的decoder
                      State state = getState(session);

                    
          // 如果當(dāng)前decoder為空
                      if (state.currentDecoder == null) {
                          MessageDecoder[] decoders
          = state.decoders;
                         
          int undecodables = 0;
                 
                 
          // 遍歷decoder候選列表
                          for (int i = decoders.length - 1; i >= 0; i--) {
                              MessageDecoder decoder
          = decoders[i];
                             
          int limit = in.limit();
                             
          int pos = in.position();

                              MessageDecoderResult result;

                             
          try {
                          
          // 執(zhí)行decodable方法并返回result(decodable方法是檢查特定的buffer是否可以decoder解碼)
                                  result = decoder.decodable(session, in);
                              }
          finally {
                       
          // 一定要重置回舊的position和limit
                                  in.position(pos);
                                  in.limit(limit);
                              }


                             
          if (result == MessageDecoder.OK) {
                       
          // 如果返回結(jié)果為OK,則設(shè)置為state的當(dāng)前decoder并break
                                  state.currentDecoder = decoder;
                                 
          break;
                              }
          else if (result == MessageDecoder.NOT_OK) {
                       
          // 如果返回結(jié)果為NOT_OK,則記錄undecodables數(shù)目++
                                  undecodables++;
                              }
          else if (result != MessageDecoder.NEED_DATA) {
                        
          // 如果結(jié)果都不是,即也不是NEED_DATA,則直接拋出異常
                                  throw new IllegalStateException("Unexpected decode result (see your decodable()): " + result);
                              }

                          }


                 
          // 如果沒(méi)有找到合適的decoder,則拋出異常
                          if (undecodables == decoders.length) {
                             
          // Throw an exception if all decoders cannot decode data.
                              String dump = in.getHexDump();
                              in.position(in.limit());
          // 跳過(guò)這段數(shù)據(jù)
                              ProtocolDecoderException e = new ProtocolDecoderException("No appropriate message decoder: " + dump);
                              e.setHexdump(dump);
                             
          throw e;
                          }

                 
                 
          // 迭代結(jié)束,如果還沒(méi)有找到合適的decoder則表示可能需要更多的數(shù)據(jù)->所以返回false->跳出父類的for-dodecode循環(huán)
                          if (state.currentDecoder == null) {
                             
          // Decoder is not determined yet (i.e. we need more data)
                              return false;
                          }

                      }


                    
          // 這里表示已找到合適的decoder,調(diào)用decode方法進(jìn)行解碼二進(jìn)制或者特定的協(xié)議數(shù)據(jù)為更高業(yè)務(wù)層的消息對(duì)象
                      try {
                          MessageDecoderResult result
          = state.currentDecoder.decode(session, in, out);
                         
          if (result == MessageDecoder.OK) {
                    
          // 重置為null
                              state.currentDecoder = null;
                             
          return true;
                          }
          else if (result == MessageDecoder.NEED_DATA) {
                             
          return false;
                          }
          else if (result == MessageDecoder.NOT_OK) {
                              state.currentDecoder
          = null;
                             
          throw new ProtocolDecoderException("Message decoder returned NOT_OK.");
                          }
          else {
                              state.currentDecoder
          = null;
                             
          throw new IllegalStateException("Unexpected decode result (see your decode()): " + result);
                          }

                      }
          catch (Exception e) {
                          state.currentDecoder
          = null;
                         
          throw e;
                      }

                  }

          5.一個(gè)特定消息協(xié)議的編解碼的例子,{@link org.apache.mina.example.sumup}
              1.AbstractMessageEncoder
              /**
               * 1.編碼消息頭,消息體編碼由子類實(shí)現(xiàn).
               * 2.AbstractMessage中只有一個(gè)sequence字段
              
          */

             
          public abstract class AbstractMessageEncoder<T extends AbstractMessage> implements MessageEncoder<T> {
                
          // 類型字段
                  private final int type;

                 
          protected AbstractMessageEncoder(int type) {
                     
          this.type = type;
                  }


                 
          public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception {
                      IoBuffer buf
          = IoBuffer.allocate(16);
                      buf.setAutoExpand(
          true); // Enable auto-expand for easier encoding

                     
          // 編碼消息頭
                      buf.putShort((short) type);//type字段占2個(gè)字節(jié)(short)
                      buf.putInt(message.getSequence());// sequence字段占4個(gè)字節(jié)(int)

                     
          // 編碼消息體,由子類實(shí)現(xiàn)
                      encodeBody(session, message, buf);
                      buf.flip();
                      out.write(buf);
                  }


                 
          // 子類實(shí)現(xiàn)編碼消息體
                  protected abstract void encodeBody(IoSession session, T message, IoBuffer out);
              }

              2.AbstractMessageDecoder
              /**
                  * 解碼消息頭,消息體由子類實(shí)現(xiàn)解碼
                 
          */

                 
          public abstract class AbstractMessageDecoder implements MessageDecoder {
                 
          private final int type;

                 
          private int sequence;

                 
          private boolean readHeader;

                 
          protected AbstractMessageDecoder(int type) {
                     
          this.type = type;
                  }


                 
          // 需覆寫decodable方法,檢查解碼結(jié)果
                  public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
                     
          // HEADER_LEN為type+sequence的長(zhǎng)度,共占6個(gè)字節(jié).如果此時(shí)buffer剩余數(shù)據(jù)不足header的長(zhǎng)度,則返回NEED_DATA的result.
                      if (in.remaining() < Constants.HEADER_LEN) {
                         
          return MessageDecoderResult.NEED_DATA;
                      }


                     
          // 第一個(gè)if判斷ok->讀取2字節(jié)(short),如果和type匹配則返回OK.
                      if (type == in.getShort()) {
                         
          return MessageDecoderResult.OK;
                      }


                     
          // 兩個(gè)if判斷都不ok,則返回NOT_OK
                      return MessageDecoderResult.NOT_OK;
                  }

                 
                          
          // 終極解碼
                  public MessageDecoderResult decode(IoSession session, IoBuffer in,
                          ProtocolDecoderOutput out)
          throws Exception {
                     
          // 如果header數(shù)據(jù)已ok且消息體數(shù)據(jù)不足則下次直接略過(guò)
                      if (!readHeader) {
                          in.getShort();
          // Skip 'type'.
                          sequence = in.getInt(); // Get 'sequence'.
                          readHeader = true;
                      }


                     
          // 解碼消息體,如果數(shù)據(jù)不足以解析消息體,則返回null
                      AbstractMessage m = decodeBody(session, in);
                     
          // 消息數(shù)據(jù)體數(shù)據(jù)不足->返回NEED_DATA
                      if (m == null) {
                         
          return MessageDecoderResult.NEED_DATA;
                      }
          else {
                          readHeader
          = false; // 成功解碼出一條完成消息,則重置readHeader->下次繼續(xù)讀取header
                      }

                      m.setSequence(sequence);
                      out.write(m);

                     
          return MessageDecoderResult.OK;
                  }


                 
          /**
                   * 數(shù)據(jù)完整不足以解析整個(gè)消息體則返回null
                  
          */

                 
          protected abstract AbstractMessage decodeBody(IoSession session,
                          IoBuffer in);
              }

              3.AddMessageEncoder
              /**
                                     * 1.AddMessage的encoder.AddMessage繼承自AbstractMessage,又增加了一個(gè)字段value
                                     * 2.該encoder的type為Constants.ADD,值為1
                  
          */

                 
          public class AddMessageEncoder<T extends AddMessage> extends AbstractMessageEncoder<T> {
                 
          public AddMessageEncoder() {
                     
          super(Constants.ADD);
                  }


                  @Override
                 
          protected void encodeBody(IoSession session, T message, IoBuffer out) {                 // 實(shí)現(xiàn)了編碼消息體,向buffer追加了AddMessage的消息體value(4個(gè)字節(jié)-int)
                      out.putInt(message.getValue());
                  }


                 
          public void dispose() throws Exception {
                  }

              }


                 4.AddMessageDecoder
              /**
                  *  AddMessage的decoder.type為Constants.ADD(1)
                 
          */

                 
          public class AddMessageDecoder extends AbstractMessageDecoder {

                 
          public AddMessageDecoder() {
                     
          super(Constants.ADD);
                  }


                  @Override
                 
          protected AbstractMessage decodeBody(IoSession session, IoBuffer in) {                  // ADD_BODY_LEN為AddMessage的消息體長(zhǎng)度(value屬性),即為4字節(jié)(int),如果此時(shí)不足4字節(jié),則返回null,表示body數(shù)據(jù)不足
                      if (in.remaining() < Constants.ADD_BODY_LEN) {
                         
          return null;
                      }


                      AddMessage m
          = new AddMessage();
                      m.setValue(in.getInt());
          // 讀取一個(gè)int
                      return m;
                  }


                 
          public void finishDecode(IoSession session, ProtocolDecoderOutput out)
                         
          throws Exception {
                  }

              }

          6.總結(jié):使用CumulativeProtocolDecoder可以方便的進(jìn)行特定消息協(xié)議的消息解碼并完美的解決了'粘包'問(wèn)題.另外DemuxingProtocolDecoder結(jié)合MessageDecoder可更完美實(shí)現(xiàn)解碼方案.
          posted on 2013-12-02 18:55 landon 閱讀(3396) 評(píng)論(2)  編輯  收藏 所屬分類: Sources

          FeedBack:
          # re: apache-mina-2.07源碼筆記4-codec
          2013-12-03 09:38 | 鵬達(dá)鎖業(yè)
          謝謝博主分享。。。。。。。。。。。  回復(fù)  更多評(píng)論
            
          # re: apache-mina-2.07源碼筆記4-codec
          2013-12-05 17:26 | 左岸
          好東西啊,謝謝分享  回復(fù)  更多評(píng)論
            
          主站蜘蛛池模板: 深州市| 屏边| 西乌珠穆沁旗| 襄汾县| 宁波市| 新巴尔虎左旗| 安龙县| 鹤庆县| 旌德县| 汉中市| 阿荣旗| 布拖县| 新化县| 同德县| 霍林郭勒市| 巴塘县| 遂宁市| 荃湾区| 宕昌县| 华阴市| 六枝特区| 北碚区| 玉溪市| 莱芜市| 松溪县| 行唐县| 佛教| 阿合奇县| 鹿泉市| 湘乡市| 汤原县| 甘谷县| 武平县| 蒙自县| 唐山市| 临沂市| 德格县| 榕江县| 石阡县| 青海省| 白朗县|