I want to fly higher
          programming Explorer
          posts - 114,comments - 263,trackbacks - 0
           1.CumulativeProtocolDecoder
                A {@link ProtocolDecoder} that cumulates the content of received buffers to a cumulative buffer to help users implement decoders.If the received {@link IoBuffer} is only a part of a message.decoders should cumulate received buffers to make a message complete or to postpone decoding until more buffers arrive.
               即解決'
          粘包'->即一次接收數(shù)據(jù)不能完全體現(xiàn)一個完整的消息數(shù)據(jù)->通過應用層數(shù)據(jù)協(xié)議,如協(xié)議中通過4字節(jié)描述消息大小或以結束符.

          2.CumulativeProtocolDecoder#decode實現(xiàn)
          /**
              * 1.緩存decode中的IoBuffer in至session的attribute
              * 2.循環(huán)調(diào)用doDecode方法直到其返回false
              * 3.解碼結束后緩存的buffer->壓縮
             
          */

             
          public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
                
          // 判斷傳輸層是否存在消息分片,如果不分片則直接doDecode.(可參考TCP/IP詳解)
                  if (!session.getTransportMetadata().hasFragmentation()) {
                     
          while (in.hasRemaining()) {
                         
          if (!doDecode(session, in, out)) {
                             
          break;
                          }

                      }


                     
          return;
                  }


                 
          boolean usingSessionBuffer = true;
                  IoBuffer buf
          = (IoBuffer) session.getAttribute(BUFFER);
                 
          // 如果session中有BUFFER這個attribute則直接執(zhí)行追加,否則直接用網(wǎng)絡層讀到的buffer
                  if (buf != null) {
                     
          boolean appended = false;
                     
          // Make sure that the buffer is auto-expanded.
                      if (buf.isAutoExpand()) {
                         
          try {
                              buf.put(in);
                              appended
          = true;
                          }
          catch (IllegalStateException e) {
                             
          // 可能調(diào)用了類似slice的方法,會使父緩沖區(qū)的自動擴展屬性失效(1.可參考AbstractIoBuffer#recapacityAllowed 2.可參考IoBuffer的實現(xiàn))
                          }
          catch (IndexOutOfBoundsException e) {
                             
          // 取消了自動擴展屬性(可參考IoBuffer實現(xiàn))
                          }

                      }


                     
          if (appended) {
             
          // 追加成功的話,直接flip
                          buf.flip();
                      }
          else {
              
          // 因為用了派生的方法(父子緩沖區(qū))如slice或取消了自動擴展而導致追加失敗->重新分配一個Buffer
                          buf.flip();
                          IoBuffer newBuf
          = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
                          newBuf.order(buf.order());
                          newBuf.put(buf);
                          newBuf.put(in);
                          newBuf.flip();
                          buf
          = newBuf;

                         
          // 更新session屬性
                          session.setAttribute(BUFFER, buf);
                      }

                  }
          else {
             
          // 此else表示session無BUFFER屬性,直接賦值
                      buf = in;
                      usingSessionBuffer
          = false;
                  }


                 
          // 無限循環(huán)直到break 1.doDecode返回false 2.doDecode返回true且buf已無數(shù)據(jù) 3.異常
                  for (;;) {
                     
          int oldPos = buf.position();
                     
          boolean decoded = doDecode(session, buf, out);
                     
          if (decoded) {
                         
          if (buf.position() == oldPos) {
                             
          throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
                          }


                         
          if (!buf.hasRemaining()) {
                             
          break;
                          }

                      }
          else {
                         
          break;
                      }

                  }


                 
          // 如果經(jīng)過decode,buffer依然有剩余數(shù)據(jù)則存儲到session->這樣下次decode的時候就可以從session取出buffer并執(zhí)行追加了
                  if (buf.hasRemaining()) {
                     
          if (usingSessionBuffer && buf.isAutoExpand()) {
                
          // 壓縮
                          buf.compact();
                      }
          else {
                          storeRemainingInSession(buf, session);
                      }

                  }
          else {
                     
          if (usingSessionBuffer) {
                          removeSessionBuffer(session);
                      }

                  }

              }

                      注.
                              1.doDecode在消息非完整的時候返回false. 
                              2.如果doDecode成功解碼出一條完整消息則返回true->如果此時buffer中依然有剩余數(shù)據(jù)則繼續(xù)執(zhí)行for->doDecode->直到buffer中的數(shù)據(jù)不足以解碼出一條成功消息返回false.或者恰恰有n條完整的消息->從for跳出.

          3.CumulativeProtocolDecoder example
              /**
                * 解碼以CRLF(回車換行)作為結束符的消息
                       
          */

             
          public class CrLfTerminatedCommandLineDecoder
                   
          extends CumulativeProtocolDecoder {

              
          private Command parseCommand(IoBuffer in) {
             
          // 實現(xiàn)將二進制byte[]轉(zhuǎn)為業(yè)務邏輯消息對象Command
                }


            
          // 只需實現(xiàn)doDecode方法即可
              protected boolean doDecode(
                       IoSession session, IoBuffer in, ProtocolDecoderOutput out)
                       
          throws Exception {

                 
          // 初始位置
                    int start = in.position();

                   
          // 查找'\r\n'標記
                    byte previous = 0;
                  
          while (in.hasRemaining()) {
                      
          byte current = in.get();

                       
          // 找到了\r\n
                        if (previous == '\r' && current == '\n') {
                          
          // Remember the current position and limit.
                            int position = in.position();
                         
          int limit = in.limit();
                         
          try {
                                in.position(start);
                              in.limit(position);
          //設置當前的位置為limit

                    
          // position和limit之間是一個完整的CRLF消息
                              out.write(parseCommand(in.slice()));//調(diào)用slice方法獲得positon和limit之間的子緩沖區(qū)->調(diào)用write方法加入消息隊列(因為網(wǎng)絡層一個包可能有多個完整消息)->后經(jīng)調(diào)用flush(遍歷消息隊列的消息)->nextFilter.messageReceived
          filter
                            }
          finally {
                              
          // 設置position為解碼后的position.limit設置為舊的limit
                               in.position(position);
                              in.limit(limit);
                            }


             
          // 直接返回true.因為在父類的decode方法中doDecode是循環(huán)執(zhí)行的直到不再有完整的消息返回false.
                         return true;
                     }


                      previous
          = current;
                   }


                   
          // 沒有找到\r\n,則重置position并返回false.使得父類decode->for跳出break.
                    in.position(start);

                   
          return false;
                }

            }

           4.
          DemuxingProtocolDecoder
              
           1.public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder
               2.這是一個復合的decoder->多路復用->找到一個合適的MessageDecoder.(不同的消息協(xié)議)

               3.其doDecode實現(xiàn)為迭代候選的MessageDecoder列表->調(diào)用MessageDecoder#decodable方法->如果解碼結果為MessageDecoderResult#NOT_OK,則從候選列表移除;如果解碼結果為MessageDecoderResult#NEED_DATA,則保留該候選decoder并在更多數(shù)據(jù)到達的時候會再次調(diào)用decodable;如果返回結果為MessageDecoderResult#OK,則表明找到了正確的decoder;如果沒有剩下任何的候選decoder,則拋出異常.

              4.doDecode源碼
                protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
                       
          // 從Session中獲取一個State.State包含一個MessageDecoder數(shù)組以及一個當前的decoder
                      State state = getState(session);

                    
          // 如果當前decoder為空
                      if (state.currentDecoder == null) {
                          MessageDecoder[] decoders
          = state.decoders;
                         
          int undecodables = 0;
                 
                 
          // 遍歷decoder候選列表
                          for (int i = decoders.length - 1; i >= 0; i--) {
                              MessageDecoder decoder
          = decoders[i];
                             
          int limit = in.limit();
                             
          int pos = in.position();

                              MessageDecoderResult result;

                             
          try {
                          
          // 執(zhí)行decodable方法并返回result(decodable方法是檢查特定的buffer是否可以decoder解碼)
                                  result = decoder.decodable(session, in);
                              }
          finally {
                       
          // 一定要重置回舊的position和limit
                                  in.position(pos);
                                  in.limit(limit);
                              }


                             
          if (result == MessageDecoder.OK) {
                       
          // 如果返回結果為OK,則設置為state的當前decoder并break
                                  state.currentDecoder = decoder;
                                 
          break;
                              }
          else if (result == MessageDecoder.NOT_OK) {
                       
          // 如果返回結果為NOT_OK,則記錄undecodables數(shù)目++
                                  undecodables++;
                              }
          else if (result != MessageDecoder.NEED_DATA) {
                        
          // 如果結果都不是,即也不是NEED_DATA,則直接拋出異常
                                  throw new IllegalStateException("Unexpected decode result (see your decodable()): " + result);
                              }

                          }


                 
          // 如果沒有找到合適的decoder,則拋出異常
                          if (undecodables == decoders.length) {
                             
          // Throw an exception if all decoders cannot decode data.
                              String dump = in.getHexDump();
                              in.position(in.limit());
          // 跳過這段數(shù)據(jù)
                              ProtocolDecoderException e = new ProtocolDecoderException("No appropriate message decoder: " + dump);
                              e.setHexdump(dump);
                             
          throw e;
                          }

                 
                 
          // 迭代結束,如果還沒有找到合適的decoder則表示可能需要更多的數(shù)據(jù)->所以返回false->跳出父類的for-dodecode循環(huán)
                          if (state.currentDecoder == null) {
                             
          // Decoder is not determined yet (i.e. we need more data)
                              return false;
                          }

                      }


                    
          // 這里表示已找到合適的decoder,調(diào)用decode方法進行解碼二進制或者特定的協(xié)議數(shù)據(jù)為更高業(yè)務層的消息對象
                      try {
                          MessageDecoderResult result
          = state.currentDecoder.decode(session, in, out);
                         
          if (result == MessageDecoder.OK) {
                    
          // 重置為null
                              state.currentDecoder = null;
                             
          return true;
                          }
          else if (result == MessageDecoder.NEED_DATA) {
                             
          return false;
                          }
          else if (result == MessageDecoder.NOT_OK) {
                              state.currentDecoder
          = null;
                             
          throw new ProtocolDecoderException("Message decoder returned NOT_OK.");
                          }
          else {
                              state.currentDecoder
          = null;
                             
          throw new IllegalStateException("Unexpected decode result (see your decode()): " + result);
                          }

                      }
          catch (Exception e) {
                          state.currentDecoder
          = null;
                         
          throw e;
                      }

                  }

          5.一個特定消息協(xié)議的編解碼的例子,{@link org.apache.mina.example.sumup}
              1.AbstractMessageEncoder
              /**
               * 1.編碼消息頭,消息體編碼由子類實現(xiàn).
               * 2.AbstractMessage中只有一個sequence字段
              
          */

             
          public abstract class AbstractMessageEncoder<T extends AbstractMessage> implements MessageEncoder<T> {
                
          // 類型字段
                  private final int type;

                 
          protected AbstractMessageEncoder(int type) {
                     
          this.type = type;
                  }


                 
          public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception {
                      IoBuffer buf
          = IoBuffer.allocate(16);
                      buf.setAutoExpand(
          true); // Enable auto-expand for easier encoding

                     
          // 編碼消息頭
                      buf.putShort((short) type);//type字段占2個字節(jié)(short)
                      buf.putInt(message.getSequence());// sequence字段占4個字節(jié)(int)

                     
          // 編碼消息體,由子類實現(xiàn)
                      encodeBody(session, message, buf);
                      buf.flip();
                      out.write(buf);
                  }


                 
          // 子類實現(xiàn)編碼消息體
                  protected abstract void encodeBody(IoSession session, T message, IoBuffer out);
              }

              2.AbstractMessageDecoder
              /**
                  * 解碼消息頭,消息體由子類實現(xiàn)解碼
                 
          */

                 
          public abstract class AbstractMessageDecoder implements MessageDecoder {
                 
          private final int type;

                 
          private int sequence;

                 
          private boolean readHeader;

                 
          protected AbstractMessageDecoder(int type) {
                     
          this.type = type;
                  }


                 
          // 需覆寫decodable方法,檢查解碼結果
                  public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
                     
          // HEADER_LEN為type+sequence的長度,共占6個字節(jié).如果此時buffer剩余數(shù)據(jù)不足header的長度,則返回NEED_DATA的result.
                      if (in.remaining() < Constants.HEADER_LEN) {
                         
          return MessageDecoderResult.NEED_DATA;
                      }


                     
          // 第一個if判斷ok->讀取2字節(jié)(short),如果和type匹配則返回OK.
                      if (type == in.getShort()) {
                         
          return MessageDecoderResult.OK;
                      }


                     
          // 兩個if判斷都不ok,則返回NOT_OK
                      return MessageDecoderResult.NOT_OK;
                  }

                 
                          
          // 終極解碼
                  public MessageDecoderResult decode(IoSession session, IoBuffer in,
                          ProtocolDecoderOutput out)
          throws Exception {
                     
          // 如果header數(shù)據(jù)已ok且消息體數(shù)據(jù)不足則下次直接略過
                      if (!readHeader) {
                          in.getShort();
          // Skip 'type'.
                          sequence = in.getInt(); // Get 'sequence'.
                          readHeader = true;
                      }


                     
          // 解碼消息體,如果數(shù)據(jù)不足以解析消息體,則返回null
                      AbstractMessage m = decodeBody(session, in);
                     
          // 消息數(shù)據(jù)體數(shù)據(jù)不足->返回NEED_DATA
                      if (m == null) {
                         
          return MessageDecoderResult.NEED_DATA;
                      }
          else {
                          readHeader
          = false; // 成功解碼出一條完成消息,則重置readHeader->下次繼續(xù)讀取header
                      }

                      m.setSequence(sequence);
                      out.write(m);

                     
          return MessageDecoderResult.OK;
                  }


                 
          /**
                   * 數(shù)據(jù)完整不足以解析整個消息體則返回null
                  
          */

                 
          protected abstract AbstractMessage decodeBody(IoSession session,
                          IoBuffer in);
              }

              3.AddMessageEncoder
              /**
                                     * 1.AddMessage的encoder.AddMessage繼承自AbstractMessage,又增加了一個字段value
                                     * 2.該encoder的type為Constants.ADD,值為1
                  
          */

                 
          public class AddMessageEncoder<T extends AddMessage> extends AbstractMessageEncoder<T> {
                 
          public AddMessageEncoder() {
                     
          super(Constants.ADD);
                  }


                  @Override
                 
          protected void encodeBody(IoSession session, T message, IoBuffer out) {                 // 實現(xiàn)了編碼消息體,向buffer追加了AddMessage的消息體value(4個字節(jié)-int)
                      out.putInt(message.getValue());
                  }


                 
          public void dispose() throws Exception {
                  }

              }


                 4.AddMessageDecoder
              /**
                  *  AddMessage的decoder.type為Constants.ADD(1)
                 
          */

                 
          public class AddMessageDecoder extends AbstractMessageDecoder {

                 
          public AddMessageDecoder() {
                     
          super(Constants.ADD);
                  }


                  @Override
                 
          protected AbstractMessage decodeBody(IoSession session, IoBuffer in) {                  // ADD_BODY_LEN為AddMessage的消息體長度(value屬性),即為4字節(jié)(int),如果此時不足4字節(jié),則返回null,表示body數(shù)據(jù)不足
                      if (in.remaining() < Constants.ADD_BODY_LEN) {
                         
          return null;
                      }


                      AddMessage m
          = new AddMessage();
                      m.setValue(in.getInt());
          // 讀取一個int
                      return m;
                  }


                 
          public void finishDecode(IoSession session, ProtocolDecoderOutput out)
                         
          throws Exception {
                  }

              }

          6.總結:使用CumulativeProtocolDecoder可以方便的進行特定消息協(xié)議的消息解碼并完美的解決了'粘包'問題.另外DemuxingProtocolDecoder結合MessageDecoder可更完美實現(xiàn)解碼方案.
          posted on 2013-12-02 18:55 landon 閱讀(3387) 評論(2)  編輯  收藏 所屬分類: Sources

          FeedBack:
          # re: apache-mina-2.07源碼筆記4-codec
          2013-12-03 09:38 | 鵬達鎖業(yè)
          謝謝博主分享。。。。。。。。。。。  回復  更多評論
            
          # re: apache-mina-2.07源碼筆記4-codec
          2013-12-05 17:26 | 左岸
          好東西啊,謝謝分享  回復  更多評論
            
          主站蜘蛛池模板: 江都市| 岳西县| 泰顺县| 津南区| 马边| 壶关县| 大悟县| 上虞市| 广水市| 甘孜县| 太仆寺旗| 庐江县| 威信县| 册亨县| 额敏县| 东乡| 新绛县| 金华市| 巴楚县| 沙湾县| 濮阳市| 永吉县| 苏尼特左旗| 哈尔滨市| 大埔区| 建阳市| 韩城市| 达拉特旗| 达日县| 长乐市| 佛坪县| 时尚| 桃源县| 措勤县| 米泉市| 西青区| 富阳市| 杭州市| 榆社县| 赤城县| 台州市|