莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          nio框架中的多個Selector結構

          Posted on 2009-10-06 16:10 dennis 閱讀(5849) 評論(2)  編輯  收藏 所屬分類: java
              隨著并發數量的提高,傳統nio框架采用一個Selector來支撐大量連接事件的管理和觸發已經遇到瓶頸,因此現在各種nio框架的新版本都采用多個Selector并存的結構,由多個Selector均衡地去管理大量連接。這里以Mina和Grizzly的實現為例。

             在Mina 2.0中,Selector的管理是由org.apache.mina.transport.socket.nio.NioProcessor來處理,每個NioProcessor對象保存一個Selector,負責具體的select、wakeup、channel的注冊和取消、讀寫事件的注冊和判斷、實際的IO讀寫操作等等,核心代碼如下:
             public NioProcessor(Executor executor) {
                  
          super(executor);
                  
          try {
                      
          // Open a new selector
                      selector = Selector.open();
                  } 
          catch (IOException e) {
                      
          throw new RuntimeIoException("Failed to open a selector.", e);
                  }
              }


              
          protected int select(long timeout) throws Exception {
                  
          return selector.select(timeout);
              }

           
              
          protected boolean isInterestedInRead(NioSession session) {
                  SelectionKey key 
          = session.getSelectionKey();
                  
          return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0;
              }


              
          protected boolean isInterestedInWrite(NioSession session) {
                  SelectionKey key 
          = session.getSelectionKey();
                  
          return key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
              }

              
          protected int read(NioSession session, IoBuffer buf) throws Exception {
                  
          return session.getChannel().read(buf.buf());
              }


              
          protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
                  
          if (buf.remaining() <= length) {
                      
          return session.getChannel().write(buf.buf());
                  } 
          else {
                      
          int oldLimit = buf.limit();
                      buf.limit(buf.position() 
          + length);
                      
          try {
                          
          return session.getChannel().write(buf.buf());
                      } 
          finally {
                          buf.limit(oldLimit);
                      }
                  }
              }

             這些方法的調用都是通過AbstractPollingIoProcessor來處理,這個類里可以看到一個nio框架的核心邏輯,注冊、select、派發,具體因為與本文主題不合,不再展開。NioProcessor的初始化是在NioSocketAcceptor的構造方法中調用的:

           public NioSocketAcceptor() {
                  
          super(new DefaultSocketSessionConfig(), NioProcessor.class);
                  ((DefaultSocketSessionConfig) getSessionConfig()).init(
          this);
              }

             直接調用了父類AbstractPollingIoAcceptor的構造函數,在其中我們可以看到,默認是啟動了一個SimpleIoProcessorPool來包裝NioProcessor:
          protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
                      Class
          <? extends IoProcessor<T>> processorClass) {
                  
          this(sessionConfig, nullnew SimpleIoProcessorPool<T>(processorClass),
                          
          true);
              }

             這里其實是一個組合模式,SimpleIoProcessorPool和NioProcessor都實現了Processor接口,一個是組合形成的Processor池,而另一個是單獨的類。調用的SimpleIoProcessorPool的構造函數是這樣:

              private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1
              public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
                  
          this(processorType, null, DEFAULT_SIZE);
              }
              可以看到,默認的池大小是cpu個數+1,也就是創建了cpu+1個的Selector對象。它的重載構造函數里是創建了一個數組,啟動一個CachedThreadPool來運行NioProcessor,通過反射創建具體的Processor對象,這里就不再列出了。

              Mina當有一個新連接建立的時候,就創建一個NioSocketSession,并且傳入上面的SimpleIoProcessorPool,當連接初始化的時候將Session加入SimpleIoProcessorPool:
              protected NioSession accept(IoProcessor<NioSession> processor,
                      ServerSocketChannel handle) 
          throws Exception {

                  SelectionKey key 
          = handle.keyFor(selector);
                  
                  
          if ((key == null|| (!key.isValid()) || (!key.isAcceptable()) ) {
                      
          return null;
                  }

                  
          // accept the connection from the client
                  SocketChannel ch = handle.accept();
                  
                  
          if (ch == null) {
                      
          return null;
                  }

                  
          return new NioSocketSession(this, processor, ch);
              }

                 
                  
          private void processHandles(Iterator<H> handles) throws Exception {
                      
          while (handles.hasNext()) {
                          H handle 
          = handles.next();
                          handles.remove();

                          
          // Associates a new created connection to a processor,
                          
          // and get back a session
                          T session = accept(processor, handle);
                          
                          
          if (session == null) {
                              
          break;
                          }

                          initSession(session, 
          nullnull);

                          
          // add the session to the SocketIoProcessor
                          session.getProcessor().add(session);
                      }
                  }

              加入的操作是遞增一個整型變量并且模數組大小后對應的NioProcessor注冊到session里:

              
          private IoProcessor<T> nextProcessor() {
                  checkDisposal();
                  
          return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
              }

           
             if (p == null) {
                      p 
          = nextProcessor();
                      IoProcessor
          <T> oldp =
                          (IoProcessor
          <T>) session.setAttributeIfAbsent(PROCESSOR, p);
                      
          if (oldp != null) {
                          p 
          = oldp;
                      }
              }


              這樣一來,每個連接都關聯一個NioProcessor,也就是關聯一個Selector對象,避免了所有連接共用一個Selector負載過高導致server響應變慢的后果。但是注意到NioSocketAcceptor也有一個Selector,這個Selector用來干什么的呢?那就是集中處理OP_ACCEPT事件的Selector,主要用于連接的接入,不跟處理讀寫事件的Selector混在一起,因此Mina的默認open的Selector是cpu+2個。

              看完mina2.0之后,我們來看看Grizzly2.0是怎么處理的,Grizzly還是比較保守,它默認就是啟動兩個Selector,其中一個專門負責accept,另一個負責連接的IO讀寫事件的管理。Grizzly 2.0中Selector的管理是通過SelectorRunner類,這個類封裝了Selector對象以及核心的分發注冊邏輯,你可以將他理解成Mina中的NioProcessor,核心的代碼如下:

          protected boolean doSelect() {
                  selectorHandler 
          = transport.getSelectorHandler();
                  selectionKeyHandler 
          = transport.getSelectionKeyHandler();
                  strategy 
          = transport.getStrategy();
                  
                  
          try {

                      
          if (isResume) {
                          
          // If resume SelectorRunner - finish postponed keys
                          isResume = false;
                          
          if (keyReadyOps != 0) {
                              
          if (!iterateKeyEvents()) return false;
                          }
                          
                          
          if (!iterateKeys()) return false;
                      }

                      lastSelectedKeysCount 
          = 0;
                      
                      selectorHandler.preSelect(
          this);
                      
                      readyKeys 
          = selectorHandler.select(this);

                      
          if (stateHolder.getState(false== State.STOPPING) return false;
                      
                      lastSelectedKeysCount 
          = readyKeys.size();
                      
                      
          if (lastSelectedKeysCount != 0) {
                          iterator 
          = readyKeys.iterator();
                          
          if (!iterateKeys()) return false;
                      }

                      selectorHandler.postSelect(
          this);
                  } 
          catch (ClosedSelectorException e) {
                      notifyConnectionException(key,
                              
          "Selector was unexpectedly closed", e,
                              Severity.TRANSPORT, Level.SEVERE, Level.FINE);
                  } 
          catch (Exception e) {
                      notifyConnectionException(key,
                              
          "doSelect exception", e,
                              Severity.UNKNOWN, Level.SEVERE, Level.FINE);
                  } 
          catch (Throwable t) {
                      logger.log(Level.SEVERE,
          "doSelect exception", t);
                      transport.notifyException(Severity.FATAL, t);
                  }

                  
          return true;
              }

              基本上是一個reactor實現的樣子,在AbstractNIOTransport類維護了一個SelectorRunner的數組,而Grizzly用于創建tcp server的類TCPNIOTransport正是繼承于AbstractNIOTransport類,在它的start方法中調用了startSelectorRunners來創建并啟動SelectorRunner數組:
            private static final int DEFAULT_SELECTOR_RUNNERS_COUNT = 2;
           @Override
            
          public void start() throws IOException {

            
          if (selectorRunnersCount <= 0) {
                          selectorRunnersCount 
          = DEFAULT_SELECTOR_RUNNERS_COUNT;
                      }
            startSelectorRunners();

          }

           protected void startSelectorRunners() throws IOException {
                  selectorRunners 
          = new SelectorRunner[selectorRunnersCount];
                  
                  
          synchronized(selectorRunners) {
                      
          for (int i = 0; i < selectorRunnersCount; i++) {
                          SelectorRunner runner 
          =
                                  
          new SelectorRunner(this, SelectorFactory.instance().create());
                          runner.start();
                          selectorRunners[i] 
          = runner;
                      }
                  }
              }

            可見Grizzly并沒有采用一個單獨的池對象來管理SelectorRunner,而是直接采用數組管理,默認數組大小是2。SelectorRunner實現了Runnable接口,它的start方法調用了一個線程池來運行自身。剛才我提到了說Grizzly的Accept是單獨一個Selector來管理的,那么是如何表現的呢?答案在RoundRobinConnectionDistributor類,這個類是用于派發注冊事件到相應的SelectorRunner上,它的派發方式是這樣:

           public Future<RegisterChannelResult> registerChannelAsync(
                      SelectableChannel channel, 
          int interestOps, Object attachment,
                      CompletionHandler completionHandler) 
                      
          throws IOException {
                  SelectorRunner runner 
          = getSelectorRunner(interestOps);
                  
                  
          return transport.getSelectorHandler().registerChannelAsync(
                          runner, channel, interestOps, attachment, completionHandler);
              }
              
              
          private SelectorRunner getSelectorRunner(int interestOps) {
                  SelectorRunner[] runners 
          = getTransportSelectorRunners();
                  
          int index;
                  
          if (interestOps == SelectionKey.OP_ACCEPT || runners.length == 1) {
                      index 
          = 0;
                  } 
          else {
                      index 
          = (counter.incrementAndGet() % (runners.length - 1)) + 1;
                  }
                  
                  
          return runners[index];
              }

              getSelectorRunner這個方法道出了秘密,如果是OP_ACCEPT,那么都使用數組中的第一個SelectorRunner,如果不是,那么就通過取模運算的結果+1從后面的SelectorRunner中取一個來注冊。

              分析完mina2.0和grizzly2.0對Selector的管理后我們可以得到幾個啟示:

          1、在處理大量連接的情況下,多個Selector比單個Selector好
          2、多個Selector的情況下,處理OP_READ和OP_WRITE的Selector要與處理OP_ACCEPT的Selector分離,也就是說處理接入應該要一個單獨的Selector對象來處理,避免IO讀寫事件影響接入速度。
          3、Selector的數目問題,mina默認是cpu+2,而grizzly總共就2個,我更傾向于mina的策略,但是我認為應該對cpu個數做一個判斷,如果CPU個數超過8個,那么更多的Selector線程可能帶來比較大的線程切換的開銷,mina默認的策略并非合適,幸好可以設置這個數值。

             
             


              

          評論

          # re: nio框架中的多個Selector結構  回復  更多評論   

          2009-10-07 17:51 by Sunng
          學習了,謝謝lz

          # re: nio框架中的多個Selector結構[未登錄]  回復  更多評論   

          2012-04-06 00:59 by ak47
          能否請教下為何“如果CPU個數超過8個,那么更多的Selector線程可能帶來比較大的線程切換的開銷”?
          主站蜘蛛池模板: 湄潭县| 九江市| 子洲县| 吉林省| 鄯善县| 新干县| 广东省| 阜宁县| 澳门| 多伦县| 东乌珠穆沁旗| 洛浦县| 天镇县| 永修县| 芜湖县| 常山县| 佳木斯市| 汉川市| 怀远县| 徐水县| 布尔津县| 门头沟区| 花莲市| 称多县| 兴安县| 东台市| 石林| 卢氏县| 阿拉善右旗| 云南省| 贺州市| 张北县| 赣州市| 四川省| 贵南县| 墨江| 漠河县| 威海市| 巩留县| 调兵山市| 克山县|