netty3.2.3源碼分析-ClientBootstrap啟動(dòng)分析

          Posted on 2010-12-06 16:59 alex_zheng 閱讀(3272) 評論(0)  編輯  收藏 所屬分類: java
          在看完了server端的啟動(dòng),再來看client端的啟動(dòng)過程是怎么進(jìn)行的。例子是TelentServer對應(yīng)的TelentClient
          public class TelnetClient {

              
          public static void main(String[] args) throws Exception {
                  
                  ClientBootstrap bootstrap 
          = new ClientBootstrap(
                          
          new NioClientSocketChannelFactory(
                                  Executors.newCachedThreadPool(),
                                  Executors.newCachedThreadPool()));

                  
          // Configure the pipeline factory.
                  bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());

                  
          // Start the connection attempt.
                  ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

                  
          // Wait until the connection attempt succeeds or fails.
                  Channel channel = future.awaitUninterruptibly().getChannel();
                  
          if (!future.isSuccess()) {
                      future.getCause().printStackTrace();
                      bootstrap.releaseExternalResources();
                      
          return;
                  }

                  
              }
          }
          直接看connect方法
          public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {

                  
          if (remoteAddress == null) {
                      
          throw new NullPointerException("remoteAddress");
                  }

                  ChannelPipeline pipeline;
                  
          try {
                      pipeline 
          = getPipelineFactory().getPipeline();
                  } 
          catch (Exception e) {
                      
          throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
                  }

                  
          // Set the options.
                  
          //NioClientSocketChannel構(gòu)造函數(shù)中會(huì)觸發(fā)channelopen
                  
          //TelnetClientPipelineFactory中的upstreamhandler沒有重寫channelOpen,這里只是一直往下傳遞該事件
                  Channel ch = getFactory().newChannel(pipeline);
                  ch.getConfig().setOptions(getOptions());

                  
          // Bind.
                  if (localAddress != null) {
                      ch.bind(localAddress);
                  }

                  
          // Connect.
                  return ch.connect(remoteAddress);
              }
          然后執(zhí)行ch.connect(remoteAddress);
          這里是NioClientSocketChannel-->NioSocketChannel-->AbstractChannel
          public ChannelFuture connect(SocketAddress remoteAddress) {
                 
          return Channels.connect(this, remoteAddress);
          }

          public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {
                  
          if (remoteAddress == null) {
                      
          throw new NullPointerException("remoteAddress");
                  }
                  ChannelFuture future 
          = future(channel, true);
                  channel.getPipeline().sendDownstream(
          new DownstreamChannelStateEvent(
                          channel, future, ChannelState.CONNECTED, remoteAddress));
                  
          return future;
          }

          從TelnetClientPipelineFactory的pipeline中由下往上傳遞CONNECTED事件,這里只有一個(gè)StringEncoder-->OneToOneEncoder,其
          handleDownstream方法對該事件不做處理,往上傳遞該事件,執(zhí)行DefaultChannelHandlerContext.sendDownstream
          public void sendDownstream(ChannelEvent e) {
                      
          //在StringEncoder之前再?zèng)]有downstreamhandler
                      DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
                      
          if (prev == null) {
                          
          try {
                              getSink().eventSunk(DefaultChannelPipeline.
          this, e);
                          } 
          catch (Throwable t) {
                              notifyHandlerException(e, t);
                          }
                      } 
          else {
                          DefaultChannelPipeline.
          this.sendDownstream(prev, e);
                      }
                  }
          執(zhí)行NioClientSocketPipelineSink.eventSunk,其中會(huì)執(zhí)行
           private void connect(
                      
          final NioClientSocketChannel channel, final ChannelFuture cf,
                      SocketAddress remoteAddress) {
                  
          try {
                      
          //如果返回true,調(diào)用nioworker.register,開始啟動(dòng)nioworker線程處理該channel的讀寫
                      
          //否則,交給boss.register方法,在boss線程中完成連接
                      if (channel.socket.connect(remoteAddress)) {
                          channel.worker.register(channel, cf);
                      } 
          else {
                          
          //為當(dāng)前clientsocketchannel添加closed的listener
                          channel.getCloseFuture().addListener(new ChannelFutureListener() {
                              
          public void operationComplete(ChannelFuture f)
                                      
          throws Exception {
                                  
          if (!cf.isDone()) {
                                      cf.setFailure(
          new ClosedChannelException());
                                  }
                              }
                          });
                          cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                          channel.connectFuture 
          = cf;
                          
                          boss.register(channel);
                      }

                  } 
          catch (Throwable t) {
                      cf.setFailure(t);
                      fireExceptionCaught(channel, t);
                      channel.worker.close(channel, succeededFuture(channel));
                  }
              }

          執(zhí)行boss.register,在boss線程中確保該channel連接成功,這里會(huì)啟動(dòng)boss線程
          void register(NioClientSocketChannel channel) {
                      
          //在RegisterTask的run方法里注冊SelectionKey.OP_CONNECT
                      Runnable registerTask = new RegisterTask(this, channel);
                      
                          
          boolean offered = registerTaskQueue.offer(registerTask);
                          
          assert offered;
                      }

                      
          if (wakenUp.compareAndSet(falsetrue)) {
                          selector.wakeup();
                      }
                  }
          最后啟動(dòng)boss.run,其中processSelectedKeys里執(zhí)行connect
          private void connect(SelectionKey k) {
                      NioClientSocketChannel ch 
          = (NioClientSocketChannel) k.attachment();
                      
          try {
                          
          if (ch.socket.finishConnect()) {
                              k.cancel();
                              
          //連接成功,才在nioworker中啟動(dòng)一個(gè)新線程來處理該socketchannel的讀寫
                              ch.worker.register(ch, ch.connectFuture);
                          }
                      } 
          catch (Throwable t) {
                          ch.connectFuture.setFailure(t);
                          fireExceptionCaught(ch, t);
                          ch.worker.close(ch, succeededFuture(ch));
                      }
                  }

          之后就是交給nioworker線程來進(jìn)行數(shù)據(jù)的發(fā)送和接收了。

          posts - 10, comments - 9, trackbacks - 0, articles - 15

          Copyright © alex_zheng

          主站蜘蛛池模板: 祁东县| 连云港市| 永修县| 昭通市| 石狮市| 财经| 汽车| 万盛区| 屯昌县| 武冈市| 勃利县| 枣强县| 东城区| 常德市| 内丘县| 丽江市| 台安县| 阿克苏市| 大余县| 景泰县| 岱山县| 天峻县| 四子王旗| 保亭| 河曲县| 洮南市| 铜陵市| 屏山县| 博湖县| 中卫市| 三都| 巴彦淖尔市| 赣州市| 息烽县| 马龙县| 兴宁市| 通许县| 萝北县| 北辰区| 澄迈县| 福州市|