上一篇分析了服務(wù)器端讀取客戶發(fā)送的數(shù)據(jù),這篇來看服務(wù)器端如何發(fā)送數(shù)據(jù)給客戶端,服務(wù)器往外發(fā)送數(shù)據(jù)是通過downstreamhandler從下到上執(zhí)行
          發(fā)送從ChannelFuture future = e.getChannel().write(response)開始執(zhí)行Channels下的
          public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
                  ChannelFuture future 
          = future(channel);
                  channel.getPipeline().sendDownstream(
                          
          new DownstreamMessageEvent(channel, future, message, remoteAddress));
                  
          return future;
           }

          telentpipeline中最下面一個(gè)downstreamhandler是stringencoder,最后執(zhí)行OneToOneEncoder的handleDownstream
          public void handleDownstream(
                      ChannelHandlerContext ctx, ChannelEvent evt) 
          throws Exception {
                  
          if (!(evt instanceof MessageEvent)) {
                      ctx.sendDownstream(evt);
                      
          return;
                  }

                  MessageEvent e 
          = (MessageEvent) evt;
                  Object originalMessage 
          = e.getMessage();
                  Object encodedMessage 
          = encode(ctx, e.getChannel(), originalMessage);
                  
          if (originalMessage == encodedMessage) {
                      ctx.sendDownstream(evt);
                  } 
          else if (encodedMessage != null) {
                      
          //這里寫encode數(shù)據(jù),DefaultChannelPipeline的sendDownstream
                      write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
                  }
              }
          DefaultChannelPipeline的sendDownstream方法
          public void sendDownstream(ChannelEvent e) {
                      DefaultChannelHandlerContext prev 
          = getActualDownstreamContext(this.prev);
                      
          if (prev == null) {
                          
          try {
                              
          //因?yàn)閟tringencoder是唯一一個(gè)downstreamhandler,這里執(zhí)行NioServerSocketPipelineSink.eventSunk
                              getSink().eventSunk(DefaultChannelPipeline.this, e);
                          } 
          catch (Throwable t) {
                              notifyHandlerException(e, t);
                          }
                      } 
          else {
                          DefaultChannelPipeline.
          this.sendDownstream(prev, e);
                      }
                  }
          eventSunk方法會執(zhí)行
          private void handleAcceptedSocket(ChannelEvent e) {
                  
          if (e instanceof ChannelStateEvent) {
                      ChannelStateEvent event 
          = (ChannelStateEvent) e;
                      NioSocketChannel channel 
          = (NioSocketChannel) event.getChannel();
                      ChannelFuture future 
          = event.getFuture();
                      ChannelState state 
          = event.getState();
                      Object value 
          = event.getValue();

                      
          switch (state) {
                      
          case OPEN:
                          
          if (Boolean.FALSE.equals(value)) {
                              channel.worker.close(channel, future);
                          }
                          
          break;
                      
          case BOUND:
                      
          case CONNECTED:
                          
          if (value == null) {
                              channel.worker.close(channel, future);
                          }
                          
          break;
                      
          case INTEREST_OPS:
                          channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                          
          break;
                      }
                  } 
          else if (e instanceof MessageEvent) {
                      MessageEvent event 
          = (MessageEvent) e;
                      NioSocketChannel channel 
          = (NioSocketChannel) event.getChannel();
                      
          //放入writerequestqueue隊(duì)列
                      boolean offered = channel.writeBuffer.offer(event);
                      
          assert offered;
                      
          //執(zhí)行nioworker的writeFromUserCode,之后執(zhí)行write0方法
                      channel.worker.writeFromUserCode(channel);
                  }
              }

          private void write0(NioSocketChannel channel) {
                  
          boolean open = true;
                  
          boolean addOpWrite = false;
                  
          boolean removeOpWrite = false;

                  
          long writtenBytes = 0;

                  
          final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
                  
          final SocketChannel ch = channel.socket;
                  
          //之前將channel放到了該隊(duì)列
                  final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
                  //默認(rèn)嘗試16次寫
                  
          final int writeSpinCount = channel.getConfig().getWriteSpinCount();
                  
          synchronized (channel.writeLock) {
                      channel.inWriteNowLoop 
          = true;
                      
          for (;;) {
                          MessageEvent evt 
          = channel.currentWriteEvent;
                          SendBuffer buf;
                          
          if (evt == null) {
                      
          //從writebuffer中獲得一個(gè)writeevent
                              if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                                  removeOpWrite 
          = true;
                                  channel.writeSuspended 
          = false;
                                  
          break;
                              }
                              
                              channel.currentWriteBuffer 
          = buf = sendBufferPool.acquire(evt.getMessage());
                          } 
          else {
                              buf 
          = channel.currentWriteBuffer;
                          }

                          ChannelFuture future 
          = evt.getFuture();
                          
          try {
                              
          long localWrittenBytes = 0;
                              
          for (int i = writeSpinCount; i > 0; i --) {
                                  
          //發(fā)送數(shù)據(jù)給客戶端,執(zhí)行PooledSendBuffer.transferTo
                                  localWrittenBytes = buf.transferTo(ch);
                                  
          if (localWrittenBytes != 0) {
                                      writtenBytes 
          += localWrittenBytes;
                                      
          break;
                                  }
                                  
          if (buf.finished()) {
                                      
          break;
                                  }
                              }

                              
          if (buf.finished()) {
                                  
          // Successful write - proceed to the next message.
                                  buf.release();
                                  channel.currentWriteEvent 
          = null;
                                  channel.currentWriteBuffer 
          = null;
                                  evt 
          = null;
                                  buf 
          = null;
                                  future.setSuccess();
                              } 
          else {
                                  
          // Not written fully - perhaps the kernel buffer is full.
                                  addOpWrite = true;
                                  channel.writeSuspended 
          = true;

                                  
          if (localWrittenBytes > 0) {
                                      
          // Notify progress listeners if necessary.
                                      future.setProgress(
                                              localWrittenBytes,
                                              buf.writtenBytes(), buf.totalBytes());
                                  }
                                  
          break;
                              }
                          } 
          catch (AsynchronousCloseException e) {
                              
          // Doesn't need a user attention - ignore.
                          } catch (Throwable t) {
                              buf.release();
                              channel.currentWriteEvent 
          = null;
                              channel.currentWriteBuffer 
          = null;
                              buf 
          = null;
                              evt 
          = null;
                              future.setFailure(t);
                              fireExceptionCaught(channel, t);
                              
          if (t instanceof IOException) {
                                  open 
          = false;
                                  close(channel, succeededFuture(channel));
                              }
                          }
                      }
                      channel.inWriteNowLoop 
          = false;
                  }
                  
          //觸發(fā)寫完成事件,執(zhí)行的是DefaultChannelPipeline的sendUpstream,最后調(diào)用SimpleChannelUpstreamHandler.writeComplete
                  
          //pipeline中的upstreamhandler的writeComplete都未重寫,所以只是簡單的傳遞該事件
                  fireWriteComplete(channel, writtenBytes);

                  
          if (open) {
                      
          if (addOpWrite) {
                          setOpWrite(channel);
                      } 
          else if (removeOpWrite) {
                          clearOpWrite(channel);
                      }
                  }
              }

          posts - 10, comments - 9, trackbacks - 0, articles - 15

          Copyright © alex_zheng

          主站蜘蛛池模板: 色达县| 杭州市| 新津县| 朝阳县| 沙雅县| 彭山县| 化隆| 都昌县| 若尔盖县| 广河县| 房产| 元江| 泾阳县| 革吉县| 兴业县| 柘荣县| 汾阳市| 剑阁县| 平果县| 喀喇沁旗| 博乐市| 永城市| 南安市| 神池县| 年辖:市辖区| 绥阳县| 丹阳市| 北辰区| 长沙市| 呼伦贝尔市| 年辖:市辖区| 鄂尔多斯市| 宁化县| 淳安县| 济阳县| 马龙县| 开江县| 康马县| 泸溪县| 乐平市| 兰溪市|