I want to fly higher
          programming Explorer
          posts - 114,comments - 263,trackbacks - 0
          1.NioSocketAcceptor持有一個Selector對象.->調用bind方法后->AbstractPollingIoAcceptor#bindInternal. 

          protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
                      
          // 創建了一個綁定請求的future operation.當selector處理注冊的時候,會signal future.
                      AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
              
                      
          // 將請求加入注冊隊列
                      registerQueue.add(request);
              
                      
          // 創建acceptor任務并啟動(acceptor-worker線程),這個是單線程的
                      startupAcceptor();
              
                      
          // 這塊的細節很重要.因為Acceptor任務是一個while(){selector.select},此時accepor線程因為select操作為阻塞,因為此時沒有任何事件發生.
                      
          // 所以這邊用了一個信號量.在初始化Acceptor這個任務后并啟動后,釋放這個許可(信號量初始化為1).然后lock.acquire繼續執行.
                      try {
                          lock.acquire();
              
                          
          // 這里等待了10毫秒,是要給acceptor-worker線程機會執行任務.即進入while(select),即執行到selector處
                          Thread.sleep(10);
                          
          // 因為acceptor任務中的selector此時因為select操作阻塞,所以這里執行喚醒selector操作.進而可以處理之前加入注冊隊列的請求.
                          wakeup();
                      }
           finally {
                          lock.release();
                      }

              
                      
          // 阻塞,等到注冊隊列的請求被處理完成
                      request.awaitUninterruptibly();
                      
                      
              }

          2.

          private class Acceptor implements Runnable {
                  
          public void run() {
                      
                      
          // 釋放一個許可,使得主線程可以執行后續后續調度(喚醒selector).
                      lock.release();

                      
          // break的條件是之前bind的serversocket全部unbind了.
                      while (selectable) {
                          
          try {
                              
                              
          // selector執行select.
                              
          // 1.有新連接出現則被喚醒 2.在首次阻塞的時候被主線程wakeup(處理注冊OP_ACCEPT)
                              int selected = select();

                              
          // registerHandles做的主要事情是將注冊隊列的綁定地址,執行NioSocketAcceptor#open.
                              
          // 即(nio的一系列配置)1.ServerSocketChannel.open() 2.channel.configureBlocking(false)
                              
          // 3.ServerSocket socket = channel.socket() 4.socket.setReuseAddress(isReuseAddress())
                              
          // 5.socket.bind(localAddress, getBacklog()) 6.channel.register(selector, SelectionKey.OP_ACCEPT) 向selector注冊Acceptor事件
                              
          // 這里有兩個ServerSocket的參數可以設置 reuseAddress/backlog
                              nHandles += registerHandles();

                              
          ..檢查regiser是否成功.如果不成功則break
                              
          ..檢查取消隊列是否為空,如果為空則break(即沒有serversocket監聽了,都unbind了).

                              
          // 表明有新連接請求進來.
                              if (selected > 0{
                                  
          // 處理新連接請求.
                                  
          // 1.accept返回new NioSocketSession 2.初始化session 3.將其綁定到processor池(SimpleIoProcessorPool)的一個NioProcessor(SimpleIoProcessorPool#getProcessor,取模)
                                  
          // 4.AbstractPollingIoProcessor#add->將session加入NioProcessor的新創建的session隊列并startupProcessor
                                  
          // 注:startupProcessor方法做了引用判斷,即一個NioProcessor只會啟動一個Processor任務.(所以對于session的io讀寫也是單線程的.因為session是已經綁定了一個固定的NioProcessor中)
                                  processHandles(selectedHandles());
                              }


                              
          // 檢查是否調用了unbind.如果unbind則加入取消隊列.
                              nHandles -= unregisterHandles();
                              
                              
          .
                  }


          3.NioProcessor持有一個Selector對象.其初始化的時候會open selector.


          private class Processor implements Runnable {
                  
          public void run() {
                      
                      
          int nSessions = 0;
                      
          // 上一次空閑檢查時間
                      lastIdleCheckTime = System.currentTimeMillis();

                      
          // 無限循環.說明proceeeor會始終占用線程池的一個線程.并可以這樣說,NioProcessor的數目就是線程池工作線程的數目.
                      for (;;) {
                          
          try {
                              
          // 這里select有一個超時,是為了管理空閑session,超時時間是1s
                              long t0 = System.currentTimeMillis();
                              
          int selected = select(SELECT_TIMEOUT);
                              
          long t1 = System.currentTimeMillis();
                              
          long delta = (t1 - t0);

                              
          //(處理java6的nio的bug)
                              
          // 下面if這段代碼的大致意思是說如果select未超時且select未被喚醒且未有讀寫事件發生的一種情況.
                              
          // 1.說明可能select被中斷了.->然后檢查是否有channel被close了(如果有的話則key.cancel).如果是的話則繼續執行select.
                              
          // 2.如果檢查發現沒有channel被close則重新注冊一個新的Selector.
                              
          //(注意這里的檢查是之前NIO的bug.Selector應該只在2種情況有返回值,即有網絡事件發生或者超時。但是Selector有時卻會在沒有獲得任何selectionKey的情況返回.)
                              
          //(http://bugs.java.com/view_bug.do?bug_id=6693490)(http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933)
                              if ((selected == 0&& !wakeupCalled.get() && (delta < 100)) {
                                  
          // Last chance : the select() may have been
                                  
          // interrupted because we have had an closed channel.
                                  if (isBrokenConnection()) {
                                      LOG.warn("Broken connection");

                                      
          // we can reselect immediately
                                      
          // set back the flag to false
                                      wakeupCalled.getAndSet(false);

                                      
          continue;
                                  }
           else {
                                      LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
                                      
          // Ok, we are hit by the nasty epoll
                                      
          // spinning.
                                      
          // Basically, there is a race condition
                                      
          // which causes a closing file descriptor not to be
                                      
          // considered as available as a selected channel, but
                                      
          // it stopped the select. The next time we will
                                      
          // call select(), it will exit immediately for the same
                                      
          // reason, and do so forever, consuming 100%
                                      
          // CPU.
                                      
          // We have to destroy the selector, and
                                      
          // register all the socket on a new one.
                                      registerNewSelector();
                                  }


                                  
          // Set back the flag to false
                                  wakeupCalled.getAndSet(false);

                                  
          // and continue the loop
                                  continue;
                              }


                              
          // 處理新session
                              
          // 1.初始化NioSession.{@link NioProcessor#init},即將channel配置為非阻塞模式并向selector注冊OP_READ
                              
          // 2.fireSessionCreated/fireSessionOpened兩個事件.(注意這兩個區別,如果配置了線程模型ExecutorFilter.則sessionOpened事件在該線程模型內執行.因為其只覆寫了該方法,而沒有覆寫sessionOpened)
                              nSessions += handleNewSessions();

                              updateTrafficMask();

                              
          // 處理讀寫事件(對于已select的session)
                              
          // 1.處理讀的時候,即AbstractPollingIoProcessor#read,讀到的字節>0則觸發fireMessageReceived.另外對ReadBufferSize這個參數做了一些判斷(buffer會分配該大小).(即如果設置的太大則decrease,設置的太小則increase,根據讀到的字節數目.所以說為了避開這個判斷,該參數可設置在(readByte,2*readByte]這個區間)
                              
          // 2.處理寫,將session加入flush隊列.
                              if (selected > 0{
                                  
          //LOG.debug("Processing "); // This log hurts one of the MDCFilter test
                                  process();
                              }


                              
          // 寫未執行的請求
                              
          // 1.通過session.write(msg)時,AbstractIoSession#write時->會觸發fireFilterWrite事件.該觸發鏈是沿著tail->header的方向觸發的.
                              
          // 2.HeadFilter#filterWrite,session上有一個WriteRequestQueue.將WriteRequest加入該隊列.
                              
          // 3.喚醒selector.
                              
          //(注意第一次在write的時候,即writeRequestQueue為空的時候,是直接schedule_flush并wakeup selector(所以第一次也 沒有必要向selecor注冊寫事件,第一次肯定是可寫的).而后續的寫請求則是直接將請求插入隊列而已.只有再次寫隊列為空的時候則會再次schedule_flush并wakeup.另外如果session的寫請求未執行完畢則會向selector注冊寫事件,在可寫的時候依然會繼續執行寫.)
                              long currentTime = System.currentTimeMillis();
                              
          // 1.遍歷flushingSessions隊列. 重置該session schedule flush flag(這個標識表示該session有寫的request還未寫完).2.flushNow,從writeRequestQueue依次取出寫請求.
                              
          // 3.maxWrittenBytes = 1.5 * maxReadBufferSize,讀寫公平(注意這里:flushNow的while循環結束條件是writtenBytes < maxWrittenBytes.即一次flush不會超過最大寫字節數.)
                              
          // (其實這個處理就是為了讀寫公平,防止因為寫的數據過多而導致read不能得到及時響應.因為都是在一個processor線程處理的.)
                              
          // 4.如果session中當前請求的buffer已發送完畢,則觸發fireMessageSent事件.
                             
          // 5.如果session中請求的數據未全部發送完畢(buffer.hasRemaining),則session重新向selector注冊寫事件 OP_WRITE.
                              flush(currentTime);

                              
          // 注意這里:
                              
          // 1.當processor正在執行read的時候,如果客戶端端掉了連接,則NioProcessor.read這里就會拋出一個io異常:java.io.IOException: 遠程主機強迫關閉了一個現有的連接
                              
          // 2.read這段代碼在try/catch異常的時候:判斷了一下異常如果是ioexception且該異常不是PortUnreachableException或者不是udp相關,則執行scheduleRemove->removingSessions.add(session)
                              
          // 而下面這句代碼則是處理removeSessions.->removeNow->對被移除的session進行destory處理(close_channel/cancel_key)并清理session的寫隊列,fireSessionDestroyed->fireSessionClosed
                              nSessions -= removeSessions();

                              
          // if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT),即timeout內未有讀寫事件發生則通知空閑
                              
          // 遍歷session 判讀當前時間與上次事件發生時間的差是否大于空閑時間
                              notifyIdleSessions(currentTime);

                              
          .
              }


          4.再談io.
               IO分兩個階段:
             1.通知內核準備數據。2.數據從內核緩沖區拷貝到應用緩沖區

             根據這2點IO類型可以分成:
                 1.阻塞IO,在兩個階段上面都是阻塞的。
                 2.非阻塞IO,在第1階段,程序不斷的輪詢直到數據準備好,第2階段還是阻塞的
                 3.IO復用,在第1階段,當一個或者多個IO準備就緒時,通知程序,第2階段還是阻塞的,在第1階段還是輪詢實現的,只是所有的IO都集中在一個地方,這個地方進行輪詢
                 4.信號IO,當數據準備完畢的時候,信號通知程序數據準備完畢,第2階段阻塞
                 5.異步IO,1,2都不阻塞
                
             當然write是從應用緩沖區到內核緩沖區.
             2.selector底層基礎實現就應該是不斷的輪訓內核緩沖區的狀態.
             3.select模型僅僅是輪訓,知道有IO事件發生了.但是并不知道是哪些channel.所以只能輪訓所有的注冊channel,然后依次判斷讀寫;引入epoll->會把哪個channel發生了什么io事件直接通知.

          posted on 2014-03-07 17:01 landon 閱讀(1985) 評論(2)  編輯  收藏 所屬分類: ProgramSources

          FeedBack:
          # re: apache-mina-2.07源碼筆記6-nio細節
          2014-03-09 11:34 | 鵬達鎖業
          給力支持 博主。。。。。。贊一個

            回復  更多評論
            
          # re: apache-mina-2.07源碼筆記6-nio細節
          2015-11-07 16:53 | qwert
          樓主大贊,分析的詳細多了,比其他的  回復  更多評論
            
          主站蜘蛛池模板: 怀宁县| 繁昌县| 旬邑县| 永寿县| 彝良县| 江津市| 康平县| 姚安县| 固阳县| 咸宁市| 佛学| 漠河县| 泸溪县| 凤翔县| 同心县| 勃利县| 河北省| 景德镇市| 鄂伦春自治旗| 蓬溪县| 北安市| 尼勒克县| 高雄市| 梓潼县| 孝感市| 江门市| 准格尔旗| 多伦县| 黔西县| 娄底市| 锡林郭勒盟| 蓬溪县| 元氏县| 虹口区| 石门县| 巴南区| 云龙县| 万盛区| 酒泉市| 乌海市| 尚义县|