上一篇分析了服務器端讀取客戶發(fā)送的數(shù)據(jù),這篇來看服務器端如何發(fā)送數(shù)據(jù)給客戶端,服務器往外發(fā)送數(shù)據(jù)是通過downstreamhandler從下到上執(zhí)行
          發(fā)送從ChannelFuture future = e.getChannel().write(response)開始執(zhí)行Channels下的
          public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
                  ChannelFuture future 
          = future(channel);
                  channel.getPipeline().sendDownstream(
                          
          new DownstreamMessageEvent(channel, future, message, remoteAddress));
                  
          return future;
           }

          telentpipeline中最下面一個downstreamhandler是stringencoder,最后執(zhí)行OneToOneEncoder的handleDownstream
          public void handleDownstream(
                      ChannelHandlerContext ctx, ChannelEvent evt) 
          throws Exception {
                  
          if (!(evt instanceof MessageEvent)) {
                      ctx.sendDownstream(evt);
                      
          return;
                  }

                  MessageEvent e 
          = (MessageEvent) evt;
                  Object originalMessage 
          = e.getMessage();
                  Object encodedMessage 
          = encode(ctx, e.getChannel(), originalMessage);
                  
          if (originalMessage == encodedMessage) {
                      ctx.sendDownstream(evt);
                  } 
          else if (encodedMessage != null) {
                      
          //這里寫encode數(shù)據(jù),DefaultChannelPipeline的sendDownstream
                      write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
                  }
              }
          DefaultChannelPipeline的sendDownstream方法
          public void sendDownstream(ChannelEvent e) {
                      DefaultChannelHandlerContext prev 
          = getActualDownstreamContext(this.prev);
                      
          if (prev == null) {
                          
          try {
                              
          //因為stringencoder是唯一一個downstreamhandler,這里執(zhí)行NioServerSocketPipelineSink.eventSunk
                              getSink().eventSunk(DefaultChannelPipeline.this, e);
                          } 
          catch (Throwable t) {
                              notifyHandlerException(e, t);
                          }
                      } 
          else {
                          DefaultChannelPipeline.
          this.sendDownstream(prev, e);
                      }
                  }
          eventSunk方法會執(zhí)行
          private void handleAcceptedSocket(ChannelEvent e) {
                  
          if (e instanceof ChannelStateEvent) {
                      ChannelStateEvent event 
          = (ChannelStateEvent) e;
                      NioSocketChannel channel 
          = (NioSocketChannel) event.getChannel();
                      ChannelFuture future 
          = event.getFuture();
                      ChannelState state 
          = event.getState();
                      Object value 
          = event.getValue();

                      
          switch (state) {
                      
          case OPEN:
                          
          if (Boolean.FALSE.equals(value)) {
                              channel.worker.close(channel, future);
                          }
                          
          break;
                      
          case BOUND:
                      
          case CONNECTED:
                          
          if (value == null) {
                              channel.worker.close(channel, future);
                          }
                          
          break;
                      
          case INTEREST_OPS:
                          channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                          
          break;
                      }
                  } 
          else if (e instanceof MessageEvent) {
                      MessageEvent event 
          = (MessageEvent) e;
                      NioSocketChannel channel 
          = (NioSocketChannel) event.getChannel();
                      
          //放入writerequestqueue隊列
                      boolean offered = channel.writeBuffer.offer(event);
                      
          assert offered;
                      
          //執(zhí)行nioworker的writeFromUserCode,之后執(zhí)行write0方法
                      channel.worker.writeFromUserCode(channel);
                  }
              }

          private void write0(NioSocketChannel channel) {
                  
          boolean open = true;
                  
          boolean addOpWrite = false;
                  
          boolean removeOpWrite = false;

                  
          long writtenBytes = 0;

                  
          final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
                  
          final SocketChannel ch = channel.socket;
                  
          //之前將channel放到了該隊列
                  final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
                  //默認嘗試16次寫
                  
          final int writeSpinCount = channel.getConfig().getWriteSpinCount();
                  
          synchronized (channel.writeLock) {
                      channel.inWriteNowLoop 
          = true;
                      
          for (;;) {
                          MessageEvent evt 
          = channel.currentWriteEvent;
                          SendBuffer buf;
                          
          if (evt == null) {
                      
          //從writebuffer中獲得一個writeevent
                              if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                                  removeOpWrite 
          = true;
                                  channel.writeSuspended 
          = false;
                                  
          break;
                              }
                              
                              channel.currentWriteBuffer 
          = buf = sendBufferPool.acquire(evt.getMessage());
                          } 
          else {
                              buf 
          = channel.currentWriteBuffer;
                          }

                          ChannelFuture future 
          = evt.getFuture();
                          
          try {
                              
          long localWrittenBytes = 0;
                              
          for (int i = writeSpinCount; i > 0; i --) {
                                  
          //發(fā)送數(shù)據(jù)給客戶端,執(zhí)行PooledSendBuffer.transferTo
                                  localWrittenBytes = buf.transferTo(ch);
                                  
          if (localWrittenBytes != 0) {
                                      writtenBytes 
          += localWrittenBytes;
                                      
          break;
                                  }
                                  
          if (buf.finished()) {
                                      
          break;
                                  }
                              }

                              
          if (buf.finished()) {
                                  
          // Successful write - proceed to the next message.
                                  buf.release();
                                  channel.currentWriteEvent 
          = null;
                                  channel.currentWriteBuffer 
          = null;
                                  evt 
          = null;
                                  buf 
          = null;
                                  future.setSuccess();
                              } 
          else {
                                  
          // Not written fully - perhaps the kernel buffer is full.
                                  addOpWrite = true;
                                  channel.writeSuspended 
          = true;

                                  
          if (localWrittenBytes > 0) {
                                      
          // Notify progress listeners if necessary.
                                      future.setProgress(
                                              localWrittenBytes,
                                              buf.writtenBytes(), buf.totalBytes());
                                  }
                                  
          break;
                              }
                          } 
          catch (AsynchronousCloseException e) {
                              
          // Doesn't need a user attention - ignore.
                          } catch (Throwable t) {
                              buf.release();
                              channel.currentWriteEvent 
          = null;
                              channel.currentWriteBuffer 
          = null;
                              buf 
          = null;
                              evt 
          = null;
                              future.setFailure(t);
                              fireExceptionCaught(channel, t);
                              
          if (t instanceof IOException) {
                                  open 
          = false;
                                  close(channel, succeededFuture(channel));
                              }
                          }
                      }
                      channel.inWriteNowLoop 
          = false;
                  }
                  
          //觸發(fā)寫完成事件,執(zhí)行的是DefaultChannelPipeline的sendUpstream,最后調用SimpleChannelUpstreamHandler.writeComplete
                  
          //pipeline中的upstreamhandler的writeComplete都未重寫,所以只是簡單的傳遞該事件
                  fireWriteComplete(channel, writtenBytes);

                  
          if (open) {
                      
          if (addOpWrite) {
                          setOpWrite(channel);
                      } 
          else if (removeOpWrite) {
                          clearOpWrite(channel);
                      }
                  }
              }

          posts - 10, comments - 9, trackbacks - 0, articles - 15

          Copyright © alex_zheng

          主站蜘蛛池模板: 务川| 临澧县| 通城县| 古浪县| 澄城县| 巴青县| 漠河县| 汕头市| 大荔县| 昔阳县| 洪洞县| 广灵县| 漠河县| 天长市| 阜南县| 海阳市| 浦东新区| 泉州市| 交口县| 宁蒗| 惠州市| 水城县| 九江县| 密云县| 册亨县| 大连市| 阿尔山市| 丰宁| 漳浦县| 阳山县| 体育| 万安县| 旬阳县| 海兴县| 来凤县| 淮滨县| 西充县| 彭阳县| 乌鲁木齐市| 岚皋县| 米易县|