netty3.2.3源碼分析-ClientBootstrap啟動分析

          Posted on 2010-12-06 16:59 alex_zheng 閱讀(3272) 評論(0)  編輯  收藏 所屬分類: java
          在看完了server端的啟動,再來看client端的啟動過程是怎么進行的。例子是TelentServer對應的TelentClient
          public class TelnetClient {

              
          public static void main(String[] args) throws Exception {
                  
                  ClientBootstrap bootstrap 
          = new ClientBootstrap(
                          
          new NioClientSocketChannelFactory(
                                  Executors.newCachedThreadPool(),
                                  Executors.newCachedThreadPool()));

                  
          // Configure the pipeline factory.
                  bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());

                  
          // Start the connection attempt.
                  ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

                  
          // Wait until the connection attempt succeeds or fails.
                  Channel channel = future.awaitUninterruptibly().getChannel();
                  
          if (!future.isSuccess()) {
                      future.getCause().printStackTrace();
                      bootstrap.releaseExternalResources();
                      
          return;
                  }

                  
              }
          }
          直接看connect方法
          public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {

                  
          if (remoteAddress == null) {
                      
          throw new NullPointerException("remoteAddress");
                  }

                  ChannelPipeline pipeline;
                  
          try {
                      pipeline 
          = getPipelineFactory().getPipeline();
                  } 
          catch (Exception e) {
                      
          throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
                  }

                  
          // Set the options.
                  
          //NioClientSocketChannel構造函數中會觸發channelopen
                  
          //TelnetClientPipelineFactory中的upstreamhandler沒有重寫channelOpen,這里只是一直往下傳遞該事件
                  Channel ch = getFactory().newChannel(pipeline);
                  ch.getConfig().setOptions(getOptions());

                  
          // Bind.
                  if (localAddress != null) {
                      ch.bind(localAddress);
                  }

                  
          // Connect.
                  return ch.connect(remoteAddress);
              }
          然后執行ch.connect(remoteAddress);
          這里是NioClientSocketChannel-->NioSocketChannel-->AbstractChannel
          public ChannelFuture connect(SocketAddress remoteAddress) {
                 
          return Channels.connect(this, remoteAddress);
          }

          public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {
                  
          if (remoteAddress == null) {
                      
          throw new NullPointerException("remoteAddress");
                  }
                  ChannelFuture future 
          = future(channel, true);
                  channel.getPipeline().sendDownstream(
          new DownstreamChannelStateEvent(
                          channel, future, ChannelState.CONNECTED, remoteAddress));
                  
          return future;
          }

          從TelnetClientPipelineFactory的pipeline中由下往上傳遞CONNECTED事件,這里只有一個StringEncoder-->OneToOneEncoder,其
          handleDownstream方法對該事件不做處理,往上傳遞該事件,執行DefaultChannelHandlerContext.sendDownstream
          public void sendDownstream(ChannelEvent e) {
                      
          //在StringEncoder之前再沒有downstreamhandler
                      DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
                      
          if (prev == null) {
                          
          try {
                              getSink().eventSunk(DefaultChannelPipeline.
          this, e);
                          } 
          catch (Throwable t) {
                              notifyHandlerException(e, t);
                          }
                      } 
          else {
                          DefaultChannelPipeline.
          this.sendDownstream(prev, e);
                      }
                  }
          執行NioClientSocketPipelineSink.eventSunk,其中會執行
           private void connect(
                      
          final NioClientSocketChannel channel, final ChannelFuture cf,
                      SocketAddress remoteAddress) {
                  
          try {
                      
          //如果返回true,調用nioworker.register,開始啟動nioworker線程處理該channel的讀寫
                      
          //否則,交給boss.register方法,在boss線程中完成連接
                      if (channel.socket.connect(remoteAddress)) {
                          channel.worker.register(channel, cf);
                      } 
          else {
                          
          //為當前clientsocketchannel添加closed的listener
                          channel.getCloseFuture().addListener(new ChannelFutureListener() {
                              
          public void operationComplete(ChannelFuture f)
                                      
          throws Exception {
                                  
          if (!cf.isDone()) {
                                      cf.setFailure(
          new ClosedChannelException());
                                  }
                              }
                          });
                          cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                          channel.connectFuture 
          = cf;
                          
                          boss.register(channel);
                      }

                  } 
          catch (Throwable t) {
                      cf.setFailure(t);
                      fireExceptionCaught(channel, t);
                      channel.worker.close(channel, succeededFuture(channel));
                  }
              }

          執行boss.register,在boss線程中確保該channel連接成功,這里會啟動boss線程
          void register(NioClientSocketChannel channel) {
                      
          //在RegisterTask的run方法里注冊SelectionKey.OP_CONNECT
                      Runnable registerTask = new RegisterTask(this, channel);
                      
                          
          boolean offered = registerTaskQueue.offer(registerTask);
                          
          assert offered;
                      }

                      
          if (wakenUp.compareAndSet(falsetrue)) {
                          selector.wakeup();
                      }
                  }
          最后啟動boss.run,其中processSelectedKeys里執行connect
          private void connect(SelectionKey k) {
                      NioClientSocketChannel ch 
          = (NioClientSocketChannel) k.attachment();
                      
          try {
                          
          if (ch.socket.finishConnect()) {
                              k.cancel();
                              
          //連接成功,才在nioworker中啟動一個新線程來處理該socketchannel的讀寫
                              ch.worker.register(ch, ch.connectFuture);
                          }
                      } 
          catch (Throwable t) {
                          ch.connectFuture.setFailure(t);
                          fireExceptionCaught(ch, t);
                          ch.worker.close(ch, succeededFuture(ch));
                      }
                  }

          之后就是交給nioworker線程來進行數據的發送和接收了。

          posts - 10, comments - 9, trackbacks - 0, articles - 15

          Copyright © alex_zheng

          主站蜘蛛池模板: 松滋市| 水城县| 镇宁| 尚志市| 海南省| 葫芦岛市| 玉林市| 屏边| 曲沃县| 牡丹江市| 博白县| 琼结县| 定结县| 嘉禾县| 广灵县| 彭阳县| 博白县| 龙江县| 叶城县| 高青县| 宁陕县| 平泉县| 隆德县| 沭阳县| 关岭| 丽水市| 新郑市| 贵州省| 剑川县| 大庆市| 济源市| 剑阁县| 太保市| 牙克石市| 三亚市| 宁南县| 云安县| 广东省| 高密市| 乌兰浩特市| 孝感市|