netty3.2.3源碼分析--ServerBootstrap啟動分析

          Posted on 2010-12-01 21:37 alex_zheng 閱讀(4378) 評論(0)  編輯  收藏 所屬分類: java
          這里首先分析下ServerBootstrap的啟動過程,在netty中,channel可以看成是socketchannel的抽象
          channelpipeline里存放著channelhandler,channelpipeline根據不同的channelevent觸發對應的操作
          如channel的open,bind,connect等
          下面以TelnetServer為例來一步步看server啟動
          public static void main(String[] args) throws Exception {
                  
          // Configure the server.
                  
          // new NioServerSocketChannelFactory中初始化一個NioServerSocketPipelineSink,用來處理downstreamhandler
                  ServerBootstrap bootstrap = new ServerBootstrap(
                          
          new NioServerSocketChannelFactory(
                                  Executors.newCachedThreadPool(),
                                  Executors.newCachedThreadPool()));

                  
          // Set up the event pipeline factory.
                  bootstrap.setPipelineFactory(new TelnetServerPipelineFactory());

                  
          // Bind and start to accept incoming connections.
                  bootstrap.bind(new InetSocketAddress(8080));
              }

          直接看serverbootstrap的bind方法
          public Channel bind(final SocketAddress localAddress) {
                  
          if (localAddress == null) {
                      
          throw new NullPointerException("localAddress");
                  }
                  
          //該隊列中只放了一個Binder
                  final BlockingQueue<ChannelFuture> futureQueue =
                      
          new LinkedBlockingQueue<ChannelFuture>();
                  
          //Binder extends SimpleChannelUpstreamHandler,處理channelOpen
                  ChannelHandler binder = new Binder(localAddress, futureQueue);
              
                 
          //這里parenthandler為null
                  ChannelHandler parentHandler = getParentHandler();
                   
          //初始化DefaultChannelPipeline
                   
          //在綁定端口前的pipeline里只有一個binder的upstreamhandler
                  ChannelPipeline bossPipeline = pipeline();  
                  
          //這里調用DefaultChannelPipeline的addlast方法,初始化一個DefaultChannelHandlerContext,
                  
          //handlercontext里面是一個鏈表結構
                 
          //該context中只有一個binder      
                  bossPipeline.addLast("binder", binder);
                  
          if (parentHandler != null) {
                      bossPipeline.addLast(
          "userHandler", parentHandler);
                  }
                  
          //一切從這里開始,getFactory()==NioServerSocketChannelFactory
                  Channel channel = getFactory().newChannel(bossPipeline);
              
              }

          NioServerSocketChannelFactory.newChannel(ChannelPipeline pipeline)如下
           public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
                  
          //初始化一個NioServerSocketChannel,pipeline中放的是binder,sink是NioServerSocketPipelineSink
                  return new NioServerSocketChannel(this, pipeline, sink);
           }

          來看NioServerSocketChannel的構造函數中我們看到這么一句fireChannelOpen(this);引用自Channles
           
          public static void fireChannelOpen(Channel channel) {
                  
          // Notify the parent handler.
                  if (channel.getParent() != null) {
                      fireChildChannelStateChanged(channel.getParent(), channel);
                  }
                  
          //這里調用DefaultChannelPipeline的sendUpstream方法
                  channel.getPipeline().sendUpstream(
                          
          new UpstreamChannelStateEvent(
                                  channel, ChannelState.OPEN, Boolean.TRUE));
              }
          DefaultChannelPipeline.sendUpstream(ChannelEvent e)
          public void sendUpstream(ChannelEvent e) {
                  
          //this.head==binder
                  DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
                  
          if (head == null) {
                      logger.warn(
                              
          "The pipeline contains no upstream handlers; discarding: " + e);
                      
          return;
                  }

                  sendUpstream(head, e);
              }

          執行
           
          void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
                  
          try {
                  
          //ctx.getHandler()==binder-->SimpleChannelUpstreamHandler
                      ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
                  } 
          catch (Throwable t) {
                      notifyHandlerException(e, t);
                  }
              }
          這里會在SimpleChannelUpstreamHandler.handleUpstream(ctx, e);中調用binder的channelOpen
          public void channelOpen(
                          ChannelHandlerContext ctx,
                          ChannelStateEvent evt) {

                      
          try {
                          
          //設置NioServerSocketChannel的DefaultServerSocketChannelConfig的pipelinetfactory
                          
          //在之后的線程分發中會去取該factory的pipeline,即TelnetServerPipelineFactory中的pipeline
                          evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
                  
                      } 
          finally {
                          
                          ctx.sendUpstream(evt);
                      }
                      
          //執行NioServerSocketChannel.bind,最終會調用Channels.bind(Channel channel, SocketAddress localAddress)
                      boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
                      
          assert finished;
                  }
          Channels.bind方法如下:
           
          public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
                  
          if (localAddress == null) {
                      
          throw new NullPointerException("localAddress");
                  }
                  ChannelFuture future 
          = future(channel);
                  
          //又調用了DefaultChannelPipeline的senddownstream,對應事件是bound
                  channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
                          channel, future, ChannelState.BOUND, localAddress));
                  
          return future;
          }
          DefaultChannelPipeline的senddownstream
           
          public void sendDownstream(ChannelEvent e) {
                  DefaultChannelHandlerContext tail 
          = getActualDownstreamContext(this.tail);
                  
          if (tail == null) {
                      
          try {
                          getSink().eventSunk(
          this, e);
                          
          return;
                      } 
          catch (Throwable t) {
                          notifyHandlerException(e, t);
                          
          return;
                      }
                  }

                  sendDownstream(tail, e);
              }
          從getActualDownstreamContext返回的是null,所以上面會執行 getSink().eventSunk(this, e);
          DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
                  
          if (ctx == null) {
                      
          return null;
                  }

                  DefaultChannelHandlerContext realCtx 
          = ctx;

                  
          //Binder是upstream,這里返回null
                  while (!realCtx.canHandleDownstream()) {
                      realCtx 
          = realCtx.prev;
                      
          if (realCtx == null) {
                          
          return null;
                      }
                  }

                  
          return realCtx;
              }
          sendDownstream將執行 getSink().eventSunk(this, e);
          getSink()獲得的是NioServerSocketPipelineSink,
          public void eventSunk(
                      ChannelPipeline pipeline, ChannelEvent e) 
          throws Exception {
                  Channel channel 
          = e.getChannel();
                  
          if (channel instanceof NioServerSocketChannel) {
                      handleServerSocket(e);
                  } 
          else if (channel instanceof NioSocketChannel) {
                      handleAcceptedSocket(e);
                  }
              }

           
          private void handleServerSocket(ChannelEvent e) {
                  
          if (!(e instanceof ChannelStateEvent)) {
                      
          return;
                  }

                  ChannelStateEvent event 
          = (ChannelStateEvent) e;
                  NioServerSocketChannel channel 
          =
                      (NioServerSocketChannel) event.getChannel();
                  ChannelFuture future 
          = event.getFuture();
                  ChannelState state 
          = event.getState();
                  Object value 
          = event.getValue();
                 
          //根據new DownstreamChannelStateEvent(channel, future, ChannelState.BOUND, localAddress)
                  switch (state) {
                  
          case OPEN:
                      
          if (Boolean.FALSE.equals(value)) {
                          close(channel, future);
                      }
                      
          break;
                  
          case BOUND:
                      
          if (value != null) {
                          
          //在這里完成socketAddress綁定
                          bind(channel, future, (SocketAddress) value);
                      } 
          else {
                          close(channel, future);
                      }
                      
          break;
                  }
              }

          對應的NioServerSocketPipelineSink.bind方法
          private void bind(
                      NioServerSocketChannel channel, ChannelFuture future,
                      SocketAddress localAddress) {

                  
          boolean bound = false;
                  
          boolean bossStarted = false;
                  
          try {
                      channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
                      bound 
          = true;

                      future.setSuccess();

                      
          //觸發channelbound
                      fireChannelBound(channel, channel.getLocalAddress());

                      Executor bossExecutor 
          =
                          ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
                      bossExecutor.execute(
                              
          new IoWorkerRunnable(
                                      
          new ThreadRenamingRunnable(
                                              
          new Boss(channel),
                                              
          "New I/O server boss #" + id +
                                              
          " (" + channel + ')')));
                      bossStarted 
          = true;
                  } 
          catch (Throwable t) {
                      future.setFailure(t);
                      fireExceptionCaught(channel, t);
                  } 
          finally {
                      
          if (!bossStarted && bound) {
                          close(channel, future);
                      }
                  }
              }
          先來看Channels.fireChannelBound方法做了什么
           
          public static void fireChannelBound(Channel channel, SocketAddress localAddress) {
                  
          //channel.getPipeline()的DefaultChannelPipeline中只有一個binder
                  
          //這里調用SimpleChannelUpstreamHandler的handleUpstream中的hannelBound(ctx, evt);
                  
                  channel.getPipeline().sendUpstream(
                          
          new UpstreamChannelStateEvent(
                                  channel, ChannelState.BOUND, localAddress));
              }

          接著看bind方法
          Executor bossExecutor =
                          ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
                     
          //在bossexcutor中創建一個boss線程
                     
          //在該boss線程中分派新的客戶端連接給workerExecutor,workerExecutor的數量為cpu*2
                      bossExecutor.execute(
                              
          new IoWorkerRunnable(
                                      
          new ThreadRenamingRunnable(
                                              
          new Boss(channel),
                                              
          "New I/O server boss #" + id +
                                              
          " (" + channel + ')')));         
          在new Boss的時候,注冊channel的accept事件
          Boss(NioServerSocketChannel channel) throws IOException {
                      
          this.channel = channel;

                      selector 
          = Selector.open();

                      
          boolean registered = false;
                      
          try {
                         
                          channel.socket.register(selector, SelectionKey.OP_ACCEPT);
                          registered 
          = true;
                      } 
          finally {
                          
          if (!registered) {
                              closeSelector();
                          }
                      }

                      channel.selector 
          = selector;
                  }

          最終調用Boss.run()

          public void run() {
                      
          //獲得當前boss線程,mainreactor
                      final Thread currentThread = Thread.currentThread();

                      channel.shutdownLock.lock();
                      
          try {
                          
          for (;;) {
                              
          try {
                                  
          if (selector.select(1000> 0) {
                                      selector.selectedKeys().clear();
                                  }
                                  
          //接收新的客戶端連接
                                  SocketChannel acceptedSocket = channel.socket.accept();
                                  
          if (acceptedSocket != null) {
                                      
          //分派當前連接給workerexcutor,即subreactor
                                      registerAcceptedChannel(acceptedSocket, currentThread);
                                  }
                              }
                          }
                      } 
          finally {
                          channel.shutdownLock.unlock();
                          closeSelector();
                      }
                  }

          private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
                      
          try {
                          
          //這里獲得用戶的pipleline,那么這個是在哪里設置的呢,在Binder的channelopen方法的第一句
                          
          // evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
                          
          //在這之前的pipeline都是defalutchannelpipeline,里面只有一個Binder
                          
          //在這之后,每一個NioAcceptedSocketChannel的pipeline獲得的是TelnetServerPipelineFactory中的pipeline
                          ChannelPipeline pipeline =
                              channel.getConfig().getPipelineFactory().getPipeline();
                          
          //nioworker充當subreactor
                          NioWorker worker = nextWorker();
                          worker.register(
          new NioAcceptedSocketChannel(
                                  channel.getFactory(), pipeline, channel,
                                  NioServerSocketPipelineSink.
          this, acceptedSocket,
                                  worker, currentThread), 
          null);
                      } 
                  }

          這里調用NioWorker.register
          void register(NioSocketChannel channel, ChannelFuture future) {

                  
          boolean server = !(channel instanceof NioClientSocketChannel);
                  
          //初始化新的task,將當前accept的socketchannel綁定到nioworker的selecortkey的attch
                  Runnable registerTask = new RegisterTask(channel, future, server);
                  Selector selector;

                  
          synchronized (startStopLock) {
                      
          if (!started) {
                          
          // Open a selector if this worker didn't start yet.
                          try {
                              
          this.selector = selector = Selector.open();
                          } 
          catch (Throwable t) {
                              
          throw new ChannelException(
                                      
          "Failed to create a selector.", t);
                          }

                          
          // Start the worker thread with the new Selector.
                          String threadName =
                              (server 
          ? "New I/O server worker #"
                                      : 
          "New I/O client worker #"+ bossId + '-' + id;

                          
          boolean success = false;
                          
          try {
                          
          //啟動一個線程來處理該連接
                              executor.execute(
                                      
          new IoWorkerRunnable(
                                              
          new ThreadRenamingRunnable(this, threadName)));
                              success 
          = true;
                          } 
          finally {
                              
          if (!success) {
                                  
          // Release the Selector if the execution fails.
                                  try {
                                      selector.close();
                                  } 
          catch (Throwable t) {
                                      logger.warn(
          "Failed to close a selector.", t);
                                  }
                                  
          this.selector = selector = null;
                                  
          // The method will return to the caller at this point.
                              }
                          }
                      } 
          else {
                          
          // Use the existing selector if this worker has been started.
                          selector = this.selector;
                      }

                      
          assert selector != null && selector.isOpen();

                      started 
          = true;
                      
          //加入到任務隊列
                      boolean offered = registerTaskQueue.offer(registerTask);
                      
          assert offered;
                  }

                  
          if (wakenUp.compareAndSet(falsetrue)) {
                      selector.wakeup();
                  }
              }
          來看registertask的run方法
          public void run() {
                      SocketAddress localAddress = channel.getLocalAddress();
                      SocketAddress remoteAddress = channel.getRemoteAddress();
                      if (localAddress == null || remoteAddress == null) {
                          if (future != null) {
                              future.setFailure(new ClosedChannelException());
                          }
                          close(channel, succeededFuture(channel));
                          return;
                      }

                      try {
                          if (server) {
                              channel.socket.configureBlocking(false);
                          }

                          synchronized (channel.interestOpsLock) {
                              //這里注冊當前accepted的socketchannel的read事件
                              channel.socket.register(
                                      selector, channel.getRawInterestOps(), channel);
                          }
                          if (future != null) {
                              channel.setConnected();
                              future.setSuccess();
                          }
                      } catch (IOException e) {
                          if (future != null) {
                              future.setFailure(e);
                          }
                          close(channel, succeededFuture(channel));
                          if (!(e instanceof ClosedChannelException)) {
                              throw new ChannelException(
                                      "Failed to register a socket to the selector.", e);
                          }
                      }

                      if (!server) {
                          if (!((NioClientSocketChannel) channel).boundManually) {
                              fireChannelBound(channel, localAddress);
                          }
                          fireChannelConnected(channel, remoteAddress);
                      }
                  }

          其中executor.execute(new IoWorkerRunnable(new ThreadRenamingRunnable(this, threadName)));
          這里的this指向當前的nioworker,調用nioworker.run
          public void run() {
                  
          //當前nioworker
                  thread = Thread.currentThread();

                  
          boolean shutdown = false;
                  Selector selector 
          = this.selector;
                  
          for (;;) {
                      wakenUp.set(
          false);

                      
          if (CONSTRAINT_LEVEL != 0) {
                          selectorGuard.writeLock().lock();
                              
          // This empty synchronization block prevents the selector
                              
          // from acquiring its lock.
                          selectorGuard.writeLock().unlock();
                      }

                      
          try {
                          SelectorUtil.select(selector);

                          
          // 'wakenUp.compareAndSet(false, true)' is always evaluated
                          
          // before calling 'selector.wakeup()' to reduce the wake-up
                          
          // overhead. (Selector.wakeup() is an expensive operation.)
                          
          //
                          
          // However, there is a race condition in this approach.
                          
          // The race condition is triggered when 'wakenUp' is set to
                          
          // true too early.
                          
          //
                          
          // 'wakenUp' is set to true too early if:
                          
          // 1) Selector is waken up between 'wakenUp.set(false)' and
                          
          //    'selector.select()'. (BAD)
                          
          // 2) Selector is waken up between 'selector.select()' and
                          
          //    'if (wakenUp.get()) {  }'. (OK)
                          
          //
                          
          // In the first case, 'wakenUp' is set to true and the
                          
          // following 'selector.select()' will wake up immediately.
                          
          // Until 'wakenUp' is set to false again in the next round,
                          
          // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                          
          // any attempt to wake up the Selector will fail, too, causing
                          
          // the following 'selector.select()' call to block
                          
          // unnecessarily.
                          
          //
                          
          // To fix this problem, we wake up the selector again if wakenUp
                          
          // is true immediately after selector.select().
                          
          // It is inefficient in that it wakes up the selector for both
                          
          // the first case (BAD - wake-up required) and the second case
                          
          // (OK - no wake-up required).

                          
          if (wakenUp.get()) {
                              selector.wakeup();
                          }

                          cancelledKeys 
          = 0;
                          processRegisterTaskQueue();
                          processWriteTaskQueue();
                          processSelectedKeys(selector.selectedKeys());

                          
          // Exit the loop when there's nothing to handle.
                          
          // The shutdown flag is used to delay the shutdown of this
                          
          // loop to avoid excessive Selector creation when
                          
          // connections are registered in a one-by-one manner instead of
                          
          // concurrent manner.
                          if (selector.keys().isEmpty()) {
                              
          if (shutdown ||
                                  executor 
          instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {

                                  
          synchronized (startStopLock) {
                                      
          if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
                                          started 
          = false;
                                          
          try {
                                              selector.close();
                                          } 
          catch (IOException e) {
                                              logger.warn(
                                                      
          "Failed to close a selector.", e);
                                          } 
          finally {
                                              
          this.selector = null;
                                          }
                                          
          break;
                                      } 
          else {
                                          shutdown 
          = false;
                                      }
                                  }
                              } 
          else {
                                  
          // Give one more second.
                                  shutdown = true;
                              }
                          } 
          else {
                              shutdown 
          = false;
                          }
                      } 
          catch (Throwable t) {
                          logger.warn(
                                  
          "Unexpected exception in the selector loop.", t);

                          
          // Prevent possible consecutive immediate failures that lead to
                          
          // excessive CPU consumption.
                          try {
                              Thread.sleep(
          1000);
                          } 
          catch (InterruptedException e) {
                              
          // Ignore.
                          }
                      }
                  }
              }


          來看processSelectedKeys(selector.selectedKeys());
           private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
                  
          for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
                      SelectionKey k 
          = i.next();
                      i.remove();
                      
          try {
                          
          int readyOps = k.readyOps();
                          
          //可讀,處理downstream
                          if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                              
          if (!read(k)) {
                                  
          // Connection already closed - no need to handle write.
                                  continue;
                              }
                          }
                          
          //可寫,處理upstream
                          if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                              writeFromSelectorLoop(k);
                          }
                      } 
          catch (CancelledKeyException e) {
                          close(k);
                      }

                      
          if (cleanUpCancelledKeys()) {
                          
          break// break the loop to avoid ConcurrentModificationException
                      }
                  }
              }
          從這個過程來看,在netty中,boss線程用來偵聽socket的連接,然后分派該連接給nioworker,在nioworker中有讀和寫的任務注冊線程池,nioworker線程負責從這些線程中獲取任務進行讀寫操作

          posts - 10, comments - 9, trackbacks - 0, articles - 15

          Copyright © alex_zheng

          主站蜘蛛池模板: 格尔木市| 双牌县| 乌拉特后旗| 台江县| 镇雄县| 宝山区| 黑河市| 池州市| 大冶市| 合作市| 喜德县| 双江| 合水县| 施甸县| 牡丹江市| 崇州市| 玉山县| 新营市| 常山县| 望都县| 英德市| 繁峙县| 商南县| 大足县| 珲春市| 惠来县| 吴桥县| 项城市| 历史| 虎林市| 宜都市| 平昌县| 徐汇区| 鄂伦春自治旗| 尼木县| 祁连县| 黔江区| 孟津县| 宁都县| 沁水县| 文安县|