I want to fly higher
          programming Explorer
          posts - 114,comments - 263,trackbacks - 0
          -

          1.源碼包結構


          2.測試線程

               1.啟動mina-exampleechoserver#Main

               2.啟動jconsole查看線程:

                   1.NioSocketAcceptor


                 RunnableAbstractPollingIoAcceptor$Acceptor

                  
          3.啟動com.game.landon.entrance.EchoClient,連接已啟動的echoserver.

                   ->此時查看線程->多了一個
                       2.NioProcessor        

                  

               RunnableAbstractPollingIoProcessor$Processor

              4.兩個線程池來源:
                    1.AbstractIoService(IoSessionConfig sessionConfig,Executor executor)
                         該構造函數中判斷
                         if(executor == null){
                              this.executor = Executors.newCachedThreadPool();
                         }
             詳見源碼.
                
                     2.SimpleIoProcessorPool
                          
          其構造函數判斷同上

          3.內部基本流程
               1.SocketAcceptor acceptor = new NioSocketAcceptor();

                    1.初始化NioSocketAcceptor線程池 {@link AbstractIoService }
                    2.初始化NioProcessor線程池  {@link SimpleIoProcessorPool }
                    3. NioSocketAcceptor#init
                      初始化Selector:selector = Selector.open();

               2.acceptor.bind(new InetSocketAddress(PORT));
                    1.AbstractPollingIoAcceptor#bindInternal
                         #startupAcceptor
                          >NioSocketAcceptor線程池執行Acceptor這個任務.

              //1.startupAcceptor啟動的時候也會判斷是否已存在Acceptor這個任務,如果不存在才會創建.
              
          // 2.Accetpor這個任務結束條件:所有bind的端口unbind->也會將Acceptor引用置為null.
              
          // 3.每個NioSocketAcceptor只有一個selector/且只對應一個Acceptor任務,即只有一個Acceptor線程.所以我們可以說Acceptor就是單線程的.(即便是一個CachedThreadPool)


              3.Acceptor#run
                    1.while(selectable)
                         1.selector.select

                         2.#registerHandles
                             ->NioSocketAcceptor#open
                             設置socket選項并向selector注冊OP_ACCEPT事件

                         3.#processHandles
                              1.NioSocketAcceptor#accept->返回NioSocketSession
                              2.SimpleIoProcessorPool#add
                                   1.根據sessionId將session與pool中的某個NioProcessor綁定 {@link SimpleIoProcessorPool#getProcessor}
                                   2.AbstractPollingIoProcessor#add
                                   3. AbstractPollingIoProcessor#startupProcessor
                                        ->NioProcessor線程池執行Processor這個任務

              // 從這段代碼看出:
                 
          //1.Processor這個任務只會創建一次.即每一個NioProcessor對象最多擁有一個Processor
                 
          // 2.每個NioProcessor只會向線程池提交一次Processor任務.而Processor任務是一個無限循環的任務.
                 
          // 也可以說,每個Processor就占據著線程池的一個線程.->即每個NioProcessor對象對應線程池中的一個線程
                 
          // 3.而session是與某個固定的NioProcessor綁定的(取模)->也就是說每個session的處理都是單線程的.(均在NioProcessor的唯一Processor線程執行)
                 
          // 4.public NioSocketAcceptor(int processorCount)構造中可指定processor的數目,其實最終是指定CacheThreadPool中多少用于prossor的線程數目.
                  
          //(默認:private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1)
                   // 5.每個NioProcessor中只有一個selector.
                  private void startupProcessor() {
                      Processor processor = processorRef.get();

                     
          if (processor == null) {
                          processor = new Processor();

                         
          if (processorRef.compareAndSet(null, processor)) {
                          executor.execute(new NamePreservingRunnable(processor, threadName));
                          }

                      }


                     
          // Just stop the select() and start it again, so that the processor
                     
          // can be activated immediately.
                      wakeup();
                      }

           

          /**
               * This private class is used to accept incoming connection from
               * clients. It's an infinite loop, which can be stopped when all
               * the registered handles have been removed (unbound).
               
          */

              
          private class Acceptor implements Runnable
              
          {
                  
          public void run()
                  
          {
                      
          int nHandles = 0;
                      lastIdleCheckTime = System.currentTimeMillis();

                      
          // Release the lock
                      lock.release();

                      
          while ( selectable )
                      
          {
                          
          try
                          
          {
                              
          int selected = select( SELECT_TIMEOUT );

                              nHandles += registerHandles();

                              
          if ( nHandles == 0 )
                              
          {
                                  
          try
                                  
          {
                                      lock.acquire();

                                      
          if ( registerQueue.isEmpty() && cancelQueue.isEmpty() )
                                      
          {
                                          acceptor = null;
                                          
          break;
                                      }

                                  }

                                  
          finally
                                  
          {
                                      lock.release();
                                  }

                              }


                              
          if ( selected > 0 )
                              
          {
                                  processReadySessions( selectedHandles() );
                              }


                              
          long currentTime = System.currentTimeMillis();
                              flushSessions( currentTime );
                              nHandles -= unregisterHandles();

                              notifyIdleSessions( currentTime );
                          }

                          
          catch ( ClosedSelectorException cse )
                          
          {
                              
          // If the selector has been closed, we can exit the loop
                              break;
                          }

                          
          catch ( Exception e )
                          
          {
                              ExceptionMonitor.getInstance().exceptionCaught( e );

                              
          try
                              
          {
                                  Thread.sleep( 1000 );
                              }

                              
          catch ( InterruptedException e1 )
                              
          {
                              }

                          }

                      }


                      
          if ( selectable && isDisposing() )
                      
          {
                          selectable = false;
                          
          try
                          
          {
                              destroy();
                          }

                          
          catch ( Exception e )
                          
          {
                              ExceptionMonitor.getInstance().exceptionCaught( e );
                          }

                          
          finally
                          
          {
                              disposalFuture.setValue( true );
                          }

                      }

                  }

              }

            
              2.Processor#run
                 1.for(;;)
                      1.select(SELECT_TIMEOUT)
                      2.#handleNewSessions
                           1. AbstractPollingIoProcessor#addNow

                           2. AbstractPollingIoProcessor#init
                                NioProcessor#init->session.getChannel向selector注冊OP_READ事件

                          3.#updateTrafficmask

                          4.#process
                               //process reads/process writes

                          5.#flush
                          6.#nofifyIdleSessions

          /**
               * The main loop. This is the place in charge to poll the Selector, and to
               * process the active sessions. It's done in
               * - handle the newly created sessions
               * -
              
          */

             
          private class Processor implements Runnable {
                 
          public void run() {
                     
          assert (processorRef.get() == this);

                     
          int nSessions = 0;
                      lastIdleCheckTime = System.currentTimeMillis();

                     
          for (;;) {
                         
          try {
                             
          // This select has a timeout so that we can manage
                             
          // idle session when we get out of the select every
                             
          // second. (note : this is a hack to avoid creating
                             
          // a dedicated thread).
                              long t0 = System.currentTimeMillis();
                             
          int selected = select(SELECT_TIMEOUT);
                             
          long t1 = System.currentTimeMillis();
                             
          long delta = (t1 - t0);

                             
          if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                                 
          // Last chance : the select() may have been
                                 
          // interrupted because we have had an closed channel.
                                  if (isBrokenConnection()) {
                                      LOG.warn("Broken connection");

                                     
          // we can reselect immediately
                                     
          // set back the flag to false
                                      wakeupCalled.getAndSet(false);

                                     
          continue;
                                  }
          else {
                                      LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                                     
          // Ok, we are hit by the nasty epoll
                                     
          // spinning.
                                     
          // Basically, there is a race condition
                                     
          // which causes a closing file descriptor not to be
                                     
          // considered as available as a selected channel, but
                                     
          // it stopped the select. The next time we will
                                     
          // call select(), it will exit immediately for the same
                                     
          // reason, and do so forever, consuming 100%
                                     
          // CPU.
                                     
          // We have to destroy the selector, and
                                     
          // register all the socket on a new one.
                                      registerNewSelector();
                                  }


                                 
          // Set back the flag to false
                                  wakeupCalled.getAndSet(false);

                                 
          // and continue the loop
                                  continue;
                              }


                             
          // Manage newly created session first
                              nSessions += handleNewSessions();

                              updateTrafficMask();

                             
          // Now, if we have had some incoming or outgoing events,
                             
          // deal with them
                              if (selected > 0) {
                                 
          //LOG.debug("Processing "); // This log hurts one of the MDCFilter test
                                  process();
                              }


                             
          // Write the pending requests
                              long currentTime = System.currentTimeMillis();
                              flush(currentTime);

                             
          // And manage removed sessions
                              nSessions -= removeSessions();

                             
          // Last, not least, send Idle events to the idle sessions
                              notifyIdleSessions(currentTime);

                             
          // Get a chance to exit the infinite loop if there are no
                             
          // more sessions on this Processor
                              if (nSessions == 0) {
                                  processorRef.set(null);

                                 
          if (newSessions.isEmpty() && isSelectorEmpty()) {
                                     
          // newSessions.add() precedes startupProcessor
                                      assert (processorRef.get() != this);
                                     
          break;
                                  }


                                 
          assert (processorRef.get() != this);

                                 
          if (!processorRef.compareAndSet(null, this)) {
                                     
          // startupProcessor won race, so must exit processor
                                      assert (processorRef.get() != this);
                                     
          break;
                                  }


                                 
          assert (processorRef.get() == this);
                              }


                             
          // Disconnect all sessions immediately if disposal has been
                             
          // requested so that we exit this loop eventually.
                              if (isDisposing()) {
                                 
          for (Iterator<S> i = allSessions(); i.hasNext();) {
                                      scheduleRemove(i.next());
                                  }


                                  wakeup();
                              }

                          }
          catch (ClosedSelectorException cse) {
                             
          // If the selector has been closed, we can exit the loop
                              break;
                          }
          catch (Throwable t) {
                              ExceptionMonitor.getInstance().exceptionCaught(t);

                             
          try {
                                  Thread.sleep(1000);
                              }
          catch (InterruptedException e1) {
                                  ExceptionMonitor.getInstance().exceptionCaught(e1);
                              }

                          }

                      }


                     
          try {
                         
          synchronized (disposalLock) {
                             
          if (disposing) {
                                  doDispose();
                              }

                          }

                      }
          catch (Throwable t) {
                          ExceptionMonitor.getInstance().exceptionCaught(t);
                      }
          finally {
                          disposalFuture.setValue(
          true);
                      }

                  }

              }

           

           
          4.相關類圖關系

              1.IoService關系類圖

              2.IoProcessor關系類圖
              



              




              3.IoSession關系類圖 
                  


           5.總結:
              本篇只是引入篇,著重介紹了mina2內部的兩個acceptor線程池和processor線程池.關于nio相關請看我之前的文章.

          http://www.aygfsteel.com/landon/archive/2013/08/16/402947.html


           

          posted on 2013-11-18 17:24 landon 閱讀(2056) 評論(2)  編輯  收藏 所屬分類: Sources

          FeedBack:
          # re: apache-mina-2.07源碼筆記1-初步
          2013-11-18 18:41 | foo
          你這筆記可讀性也太差了  回復  更多評論
            
          # re: apache-mina-2.07源碼筆記1-初步[未登錄]
          2013-11-19 14:26 | landon
          確實有點.不過自己明白即可.哈哈.@foo
            回復  更多評論
            
          主站蜘蛛池模板: 泽普县| 南澳县| 宁波市| 宿迁市| 乾安县| 榕江县| 南丰县| 丹凤县| 辉南县| 从化市| 柳江县| 礼泉县| 托克逊县| 古田县| 嫩江县| 新绛县| 广南县| 新余市| 保德县| 武清区| 司法| 汾西县| 南木林县| 大竹县| 富裕县| 敦煌市| 南宁市| 宜阳县| 轮台县| 林州市| 德钦县| 砀山县| 荥经县| 唐山市| 芜湖市| 登封市| 边坝县| 平遥县| 阳原县| 和平区| 舟山市|