上一篇分析了serverboostrap的啟動(dòng),接下來(lái)分析netty的數(shù)據(jù)讀取。
          在nioworker的,負(fù)責(zé)讀取操作是由,在該方法中,如果當(dāng)前channel的(readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0,且此時(shí)
          ch.read(buff)<0,則判斷客戶端已經(jīng)斷開(kāi)連接
          private boolean read(SelectionKey k) {
                  
          final SocketChannel ch = (SocketChannel) k.channel();
                  
          final NioSocketChannel channel = (NioSocketChannel) k.attachment();

                  
          final ReceiveBufferSizePredictor predictor =
                      channel.getConfig().getReceiveBufferSizePredictor();
                  
          //默認(rèn)1024個(gè)字節(jié)空間
                  final int predictedRecvBufSize = predictor.nextReceiveBufferSize();

                  
          int ret = 0;
                  
          int readBytes = 0;
                  
          boolean failure = true;
                  
          //分配連續(xù)的1024個(gè)byte空間
                  ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
                  
          try {
                      
          while ((ret = ch.read(bb)) > 0) {
                          readBytes 
          += ret;
                          
          if (!bb.hasRemaining()) {
                              
          break;
                          }
                      }
                      failure 
          = false;
                  } 
          catch (ClosedChannelException e) {
                      
          // Can happen, and does not need a user attention.
                  } catch (Throwable t) {
                      fireExceptionCaught(channel, t);
                  }

                  
          if (readBytes > 0) {
                      bb.flip();

                      
          final ChannelBufferFactory bufferFactory =
                          channel.getConfig().getBufferFactory();
                      
          final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
                      buffer.setBytes(
          0, bb);
                      buffer.writerIndex(readBytes);

                      recvBufferPool.release(bb);

                      
          // Update the predictor.
                      predictor.previousReceiveBufferSize(readBytes);

                      
          //觸發(fā)消息接收事件,根據(jù)pipeline中upstreamhandler由上到下的順序,調(diào)用messageReceived方法
                      fireMessageReceived(channel, buffer);
                  } 
          else {
                      recvBufferPool.release(bb);
                  }

                  
          if (ret < 0 || failure) {
                      close(channel, succeededFuture(channel));
                      
          return false;
                  }

                  
          return true;
              }
              

          在pipelinefactory中的第一個(gè)upstreamhandler為DelimiterBasedFrameDecoder,繼承自FrameDecoder
          public ChannelPipeline getPipeline() throws Exception {
                  
          // Create a default pipeline implementation.
                  ChannelPipeline pipeline = pipeline();

                  
          // Add the text line codec combination first,
                  pipeline.addLast("framer"new DelimiterBasedFrameDecoder(
                          
          8192, Delimiters.lineDelimiter()));
                  pipeline.addLast(
          "decoder"new StringDecoder());
                  pipeline.addLast(
          "encoder"new StringEncoder());

                  
          // and then business logic.
                  pipeline.addLast("handler"new TelnetServerHandler());

                  
          return pipeline;
              }
          會(huì)調(diào)用FrameDecoder的messageReceived
           
          public void messageReceived(
                      ChannelHandlerContext ctx, MessageEvent e) 
          throws Exception {

                  Object m 
          = e.getMessage();
                  
          if (!(m instanceof ChannelBuffer)) {
                      ctx.sendUpstream(e);
                      
          return;
                  }

                  ChannelBuffer input 
          = (ChannelBuffer) m;
                  
          if (!input.readable()) {
                      
          return;
                  }

                  ChannelBuffer cumulation 
          = cumulation(ctx);
                  
          if (cumulation.readable()) {
                      cumulation.discardReadBytes();
                      cumulation.writeBytes(input);
                      callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
                  } 
          else {
                      
          //這里調(diào)用子類的decode方法
                      callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
                      
          if (input.readable()) {
                          cumulation.writeBytes(input);
                      }
                  }
              }

           //在這個(gè)upstreamhandler中,會(huì)一直讀取數(shù)據(jù),直到遇到協(xié)議約定的結(jié)束標(biāo)志才將messagereceived事件傳給下一個(gè)
           
          private void callDecode(
                      ChannelHandlerContext context, Channel channel,
                      ChannelBuffer cumulation, SocketAddress remoteAddress) 
          throws Exception {

                  
          while (cumulation.readable()) {
                      
          int oldReaderIndex = cumulation.readerIndex();
                      Object frame 
          = decode(context, channel, cumulation);
                      
          if (frame == null) {
                          
          if (oldReaderIndex == cumulation.readerIndex()) {
                              
          // Seems like more data is required.
                              
          // Let us wait for the next notification.
                              break;
                          } 
          else {
                              
          // Previous data has been discarded.
                              
          // Probably it is reading on.
                              continue;
                          }
                      } 
          else if (oldReaderIndex == cumulation.readerIndex()) {
                          
          throw new IllegalStateException(
                                  
          "decode() method must read at least one byte " +
                                  
          "if it returned a frame (caused by: " + getClass() + ")");
                      }
                      
          //將messagereceive事件傳給下個(gè)upstreamhandler
                      unfoldAndFireMessageReceived(context, remoteAddress, frame);
                  }
              }
          看子類的decode是如何判斷數(shù)據(jù)讀取完畢
          protected Object decode(
                      ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) 
          throws Exception {
                  
          // Try all delimiters and choose the delimiter which yields the shortest frame.
                  int minFrameLength = Integer.MAX_VALUE;
                  ChannelBuffer minDelim 
          = null;
                  
          //獲取\r\n的位置
                  for (ChannelBuffer delim: delimiters) {
                      
          int frameLength = indexOf(buffer, delim);
                      
          if (frameLength >= 0 && frameLength < minFrameLength) {
                          minFrameLength 
          = frameLength;
                          minDelim 
          = delim;
                      }
                  }
                  
          //如果找到\r\n,表明客戶端數(shù)據(jù)發(fā)送完畢
                  if (minDelim != null) {
                      
          int minDelimLength = minDelim.capacity();
                      ChannelBuffer frame;

                      
          if (discardingTooLongFrame) {
                          
          // We've just finished discarding a very large frame.
                          
          // Go back to the initial state.
                          discardingTooLongFrame = false;
                          buffer.skipBytes(minFrameLength 
          + minDelimLength);

                          
          // TODO Let user choose when the exception should be raised - early or late?
                          
          //      If early, fail() should be called when discardingTooLongFrame is set to true.
                          int tooLongFrameLength = this.tooLongFrameLength;
                          
          this.tooLongFrameLength = 0;
                          fail(ctx, tooLongFrameLength);
                          
          return null;
                      }

                      
          if (minFrameLength > maxFrameLength) {
                          
          // Discard read frame.
                          buffer.skipBytes(minFrameLength + minDelimLength);
                          fail(ctx, minFrameLength);
                          
          return null;
                      }

                      
          if (stripDelimiter) {
                          
          //這里讀取全部數(shù)據(jù)
                          frame = buffer.readBytes(minFrameLength);
                          buffer.skipBytes(minDelimLength);
                      } 
          else {
                          frame 
          = buffer.readBytes(minFrameLength + minDelimLength);
                      }

                      
          return frame;
                  } 
          else {
                      
          if (!discardingTooLongFrame) {
                          
          if (buffer.readableBytes() > maxFrameLength) {
                              
          // Discard the content of the buffer until a delimiter is found.
                              tooLongFrameLength = buffer.readableBytes();
                              buffer.skipBytes(buffer.readableBytes());
                              discardingTooLongFrame 
          = true;
                          }
                      } 
          else {
                          
          // Still discarding the buffer since a delimiter is not found.
                          tooLongFrameLength += buffer.readableBytes();
                          buffer.skipBytes(buffer.readableBytes());
                      }
                      
          return null;
                  }
              }
          因?yàn)閡nfold默認(rèn)是false,會(huì)執(zhí)行,調(diào)用下一個(gè)upstreamhandler,這里是stringdecoder,通過(guò)stringdecoder,將channelbuffer中的數(shù)據(jù)轉(zhuǎn)為string
          然后再觸發(fā)下一個(gè)upstreamhandler的messagereceive,這里是TelnetServerHandler
          public void messageReceived(
                      ChannelHandlerContext ctx, MessageEvent e) {

                  
          // Cast to a String first.
                  
          // We know it is a String because we put some codec in TelnetPipelineFactory.
                  String request = (String) e.getMessage();

                  
          // Generate and write a response.
                  String response;
                  
          boolean close = false;
                  
          if (request.length() == 0) {
                      response 
          = "Please type something."r"n";
                  } 
          else if (request.toLowerCase().equals("bye")) {
                      response 
          = "Have a good day!"r"n";
                      close 
          = true;
                  } 
          else {
                      response 
          = "Did you say '" + request + "'?"r"n";
                  }

                  
          // We do not need to write a ChannelBuffer here.
                  
          // We know the encoder inserted at TelnetPipelineFactory will do the conversion.
                  ChannelFuture future = e.getChannel().write(response);

                  
          // Close the connection after sending 'Have a good day!'
                  
          // if the client has sent 'bye'.
                  if (close) {
                      future.addListener(ChannelFutureListener.CLOSE);
                  }
              }

          數(shù)據(jù)讀取分析完畢,接著繼續(xù)分析服務(wù)器端數(shù)據(jù)的發(fā)送


          posts - 10, comments - 9, trackbacks - 0, articles - 15

          Copyright © alex_zheng

          主站蜘蛛池模板: 甘德县| 灵山县| 平原县| 渭源县| 曲阜市| 平阴县| 岢岚县| 莱西市| 泸定县| 色达县| 临夏县| 冕宁县| 威远县| 资中县| 湖北省| 长顺县| 漳浦县| 巴林右旗| 花莲县| 蓬安县| 陆川县| 龙江县| 渝中区| 温州市| 花莲县| 民权县| 磐石市| 抚顺县| 清新县| 黎城县| 夏津县| 若尔盖县| 长治县| 新巴尔虎左旗| 玛沁县| 太保市| 德阳市| 雅江县| 邻水| 米易县| 玛沁县|