netty3.2.3源碼分析--服務器端發送數據分析

          Posted on 2010-12-04 14:54 alex_zheng 閱讀(1286) 評論(0)  編輯  收藏 所屬分類: java
          上一篇分析了服務器端讀取客戶發送的數據,這篇來看服務器端如何發送數據給客戶端,服務器往外發送數據是通過downstreamhandler從下到上執行
          發送從ChannelFuture future = e.getChannel().write(response)開始執行Channels下的
          public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
                  ChannelFuture future 
          = future(channel);
                  channel.getPipeline().sendDownstream(
                          
          new DownstreamMessageEvent(channel, future, message, remoteAddress));
                  
          return future;
           }

          telentpipeline中最下面一個downstreamhandler是stringencoder,最后執行OneToOneEncoder的handleDownstream
          public void handleDownstream(
                      ChannelHandlerContext ctx, ChannelEvent evt) 
          throws Exception {
                  
          if (!(evt instanceof MessageEvent)) {
                      ctx.sendDownstream(evt);
                      
          return;
                  }

                  MessageEvent e 
          = (MessageEvent) evt;
                  Object originalMessage 
          = e.getMessage();
                  Object encodedMessage 
          = encode(ctx, e.getChannel(), originalMessage);
                  
          if (originalMessage == encodedMessage) {
                      ctx.sendDownstream(evt);
                  } 
          else if (encodedMessage != null) {
                      
          //這里寫encode數據,DefaultChannelPipeline的sendDownstream
                      write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
                  }
              }
          DefaultChannelPipeline的sendDownstream方法
          public void sendDownstream(ChannelEvent e) {
                      DefaultChannelHandlerContext prev 
          = getActualDownstreamContext(this.prev);
                      
          if (prev == null) {
                          
          try {
                              
          //因為stringencoder是唯一一個downstreamhandler,這里執行NioServerSocketPipelineSink.eventSunk
                              getSink().eventSunk(DefaultChannelPipeline.this, e);
                          } 
          catch (Throwable t) {
                              notifyHandlerException(e, t);
                          }
                      } 
          else {
                          DefaultChannelPipeline.
          this.sendDownstream(prev, e);
                      }
                  }
          eventSunk方法會執行
          private void handleAcceptedSocket(ChannelEvent e) {
                  
          if (e instanceof ChannelStateEvent) {
                      ChannelStateEvent event 
          = (ChannelStateEvent) e;
                      NioSocketChannel channel 
          = (NioSocketChannel) event.getChannel();
                      ChannelFuture future 
          = event.getFuture();
                      ChannelState state 
          = event.getState();
                      Object value 
          = event.getValue();

                      
          switch (state) {
                      
          case OPEN:
                          
          if (Boolean.FALSE.equals(value)) {
                              channel.worker.close(channel, future);
                          }
                          
          break;
                      
          case BOUND:
                      
          case CONNECTED:
                          
          if (value == null) {
                              channel.worker.close(channel, future);
                          }
                          
          break;
                      
          case INTEREST_OPS:
                          channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                          
          break;
                      }
                  } 
          else if (e instanceof MessageEvent) {
                      MessageEvent event 
          = (MessageEvent) e;
                      NioSocketChannel channel 
          = (NioSocketChannel) event.getChannel();
                      
          //放入writerequestqueue隊列
                      boolean offered = channel.writeBuffer.offer(event);
                      
          assert offered;
                      
          //執行nioworker的writeFromUserCode,之后執行write0方法
                      channel.worker.writeFromUserCode(channel);
                  }
              }

          private void write0(NioSocketChannel channel) {
                  
          boolean open = true;
                  
          boolean addOpWrite = false;
                  
          boolean removeOpWrite = false;

                  
          long writtenBytes = 0;

                  
          final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
                  
          final SocketChannel ch = channel.socket;
                  
          //之前將channel放到了該隊列
                  final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
                  //默認嘗試16次寫
                  
          final int writeSpinCount = channel.getConfig().getWriteSpinCount();
                  
          synchronized (channel.writeLock) {
                      channel.inWriteNowLoop 
          = true;
                      
          for (;;) {
                          MessageEvent evt 
          = channel.currentWriteEvent;
                          SendBuffer buf;
                          
          if (evt == null) {
                      
          //從writebuffer中獲得一個writeevent
                              if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                                  removeOpWrite 
          = true;
                                  channel.writeSuspended 
          = false;
                                  
          break;
                              }
                              
                              channel.currentWriteBuffer 
          = buf = sendBufferPool.acquire(evt.getMessage());
                          } 
          else {
                              buf 
          = channel.currentWriteBuffer;
                          }

                          ChannelFuture future 
          = evt.getFuture();
                          
          try {
                              
          long localWrittenBytes = 0;
                              
          for (int i = writeSpinCount; i > 0; i --) {
                                  
          //發送數據給客戶端,執行PooledSendBuffer.transferTo
                                  localWrittenBytes = buf.transferTo(ch);
                                  
          if (localWrittenBytes != 0) {
                                      writtenBytes 
          += localWrittenBytes;
                                      
          break;
                                  }
                                  
          if (buf.finished()) {
                                      
          break;
                                  }
                              }

                              
          if (buf.finished()) {
                                  
          // Successful write - proceed to the next message.
                                  buf.release();
                                  channel.currentWriteEvent 
          = null;
                                  channel.currentWriteBuffer 
          = null;
                                  evt 
          = null;
                                  buf 
          = null;
                                  future.setSuccess();
                              } 
          else {
                                  
          // Not written fully - perhaps the kernel buffer is full.
                                  addOpWrite = true;
                                  channel.writeSuspended 
          = true;

                                  
          if (localWrittenBytes > 0) {
                                      
          // Notify progress listeners if necessary.
                                      future.setProgress(
                                              localWrittenBytes,
                                              buf.writtenBytes(), buf.totalBytes());
                                  }
                                  
          break;
                              }
                          } 
          catch (AsynchronousCloseException e) {
                              
          // Doesn't need a user attention - ignore.
                          } catch (Throwable t) {
                              buf.release();
                              channel.currentWriteEvent 
          = null;
                              channel.currentWriteBuffer 
          = null;
                              buf 
          = null;
                              evt 
          = null;
                              future.setFailure(t);
                              fireExceptionCaught(channel, t);
                              
          if (t instanceof IOException) {
                                  open 
          = false;
                                  close(channel, succeededFuture(channel));
                              }
                          }
                      }
                      channel.inWriteNowLoop 
          = false;
                  }
                  
          //觸發寫完成事件,執行的是DefaultChannelPipeline的sendUpstream,最后調用SimpleChannelUpstreamHandler.writeComplete
                  
          //pipeline中的upstreamhandler的writeComplete都未重寫,所以只是簡單的傳遞該事件
                  fireWriteComplete(channel, writtenBytes);

                  
          if (open) {
                      
          if (addOpWrite) {
                          setOpWrite(channel);
                      } 
          else if (removeOpWrite) {
                          clearOpWrite(channel);
                      }
                  }
              }

          posts - 10, comments - 9, trackbacks - 0, articles - 15

          Copyright © alex_zheng

          主站蜘蛛池模板: 循化| 日土县| 格尔木市| 祁门县| 青铜峡市| 大竹县| 上杭县| 易门县| 哈密市| 抚宁县| 博湖县| 彝良县| 和静县| 镇远县| 洪江市| 潜山县| 且末县| 电白县| 泸溪县| 麻阳| 韶山市| 临城县| 南川市| 大余县| 格尔木市| 赫章县| 通化市| 平果县| 沽源县| 绥滨县| 永胜县| 古浪县| 建德市| 嵊州市| 洛阳市| 兰考县| 肥城市| 栖霞市| 永丰县| 榆社县| 高阳县|