隨筆-9  評(píng)論-3  文章-0  trackbacks-0

          接著上面的流程,現(xiàn)在請(qǐng)求到了Poller的#register()方法。

              public void register(final NioChannel socket) {
                  socket.setPoller(
          this);
                  
          // KeyAttachment是對(duì)NioChannel信息的包裝,同樣是非GC
                  KeyAttachment key = keyCache.poll();
                  
          final KeyAttachment ka = key != null ? key : new KeyAttachment(socket);
                  ka.reset(
          this, socket, getSocketProperties().getSoTimeout());
                  ka.setKeepAliveLeft(NioEndpoint.
          this.getMaxKeepAliveRequests());
                  
                  
          // PollerEvent的初始化,非GC Again
                  PollerEvent r = eventCache.poll();
                  
          // this is what OP_REGISTER turns into.
                  
          // 讀取數(shù)據(jù)的事件
                  ka.interestOps(SelectionKey.OP_READ);
                  
          if (r == null)
                      r 
          = new PollerEvent(socket, ka, OP_REGISTER);
                  
          else
                      r.reset(socket, ka, OP_REGISTER);
                  
                  
          // 把事件加到Poller
                  addEvent(r);
              }


              
          public void addEvent(Runnable event) {
                  
          // 把事件加入到隊(duì)列中
                  events.offer(event);
                  
          // ++wakeupCounter
                  if (wakeupCounter.incrementAndGet() == 0) selector.wakeup();
              }

          其實(shí)也挺好懂的,就是把NioChannel作為OP_REGISTER事件注冊(cè)到Poller,這樣在Poller的#run()方法中就可以對(duì)加入Poller的事件進(jìn)行處理了。

              public void run() {
                  
          while (running) {
                      
          try {
                          
          while (paused && (!close)) {
                              
          try {
                                  Thread.sleep(
          100);
                              }
           catch (InterruptedException e) {
                                  
          // Ignore
                              }

                          }

                          
          boolean hasEvents = false;

                          hasEvents 
          = (hasEvents | events());
                          
          // Time to terminate?
                          if (close) {
                              timeout(
          0false);
                              
          break;
                          }

                          
          try {
                              
          if (!close) {
                                  
          if (wakeupCounter.get() > 0{
                                      
          // 立刻返回 I/O 就緒的那些通道的鍵
                                      keyCount = selector.selectNow();
                                  }
           else {
                                      keyCount 
          = selector.keys().size();
                                      
          // 這里把wakeupCounter設(shè)成-1,在addEvent的時(shí)候就會(huì)喚醒selector
                                      wakeupCounter.set(-1);
                                      
          // 使用阻塞的方式
                                      keyCount = selector.select(selectorTimeout);
                                  }

                                  wakeupCounter.set(
          0);
                              }

                              
          if (close) {
                                  timeout(
          0false);
                                  selector.close();
                                  
          break;
                              }

                          }
           catch (NullPointerException x) {
                              
          // sun bug 5076772 on windows JDK 1.5
                              if (log.isDebugEnabled())
                                  log.debug(
          "Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
                              
          if (wakeupCounter == null || selector == null)
                                  
          throw x;
                              
          continue;
                          }
           catch (CancelledKeyException x) {
                              
          // sun bug 5076772 on windows JDK 1.5
                              if (log.isDebugEnabled())
                                  log.debug(
          "Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
                              
          if (wakeupCounter == null || selector == null)
                                  
          throw x;
                              
          continue;
                          }
           catch (Throwable x) {
                              ExceptionUtils.handleThrowable(x);
                              log.error(
          "", x);
                              
          continue;
                          }

                          
          // either we timed out or we woke up, process events first
                          if (keyCount == 0)
                              hasEvents 
          = (hasEvents | events());

                          Iterator
          <SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator()
                                  : 
          null;
                          
          // Walk through the collection of ready keys and dispatch
                          
          // any active event.
                          while (iterator != null && iterator.hasNext()) {
                              SelectionKey sk 
          = iterator.next();
                              
          // 這里的KeyAttachment實(shí)在#register()方法中注冊(cè)的
                              KeyAttachment attachment = (KeyAttachment) sk.attachment();
                              attachment.access();
                              iterator.remove();
                              
          // 繼續(xù)流程
                              processKey(sk, attachment);
                          }
          // while

                          
          // process timeouts
                          timeout(keyCount, hasEvents);
                          
          if (oomParachute > 0 && oomParachuteData == null)
                              checkParachute();
                      }
           catch (OutOfMemoryError oom) {
                          
          try {
                              oomParachuteData 
          = null;
                              releaseCaches();
                              log.error(
          "", oom);
                          }
           catch (Throwable oomt) {
                              
          try {
                                  System.err.println(oomParachuteMsg);
                                  oomt.printStackTrace();
                              }
           catch (Throwable letsHopeWeDontGetHere) {
                                  ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                              }

                          }

                      }

                  }
          // while
                  synchronized (this{
                      
          this.notifyAll();
                  }

                  stopLatch.countDown();

              }

          這個(gè)方法有2個(gè)方法需要關(guān)注一下:#events()和#processKey():

              public boolean events() {
                  
          boolean result = false;
                  
          // synchronized (events) {
                  Runnable r = null;
                  
          // 返回是事件隊(duì)列中是否有事件
                  result = (events.size() > 0);
                  
          while ((r = events.poll()) != null{
                      
          try {
                          
          // 執(zhí)行KeyEvent的#run()
                          r.run();
                          
          if (r instanceof PollerEvent) {
                              ((PollerEvent) r).reset();
                              
          // 對(duì)KeyEvent進(jìn)行回收
                              eventCache.offer((PollerEvent) r);
                          }

                      }
           catch (Throwable x) {
                          log.error(
          "", x);
                      }

                  }

                  
          // events.clear();
                  
          // }
                  return result;
              }

          這里執(zhí)行了SocketChannel對(duì)應(yīng)的KeyEvent的#run()方法,在這個(gè)方法里給SocketChannel注冊(cè)了OP_READ:

              public void run() {
                  
          if (interestOps == OP_REGISTER) {
                      
          try {
                          
          // 給SocketChannel注冊(cè)O(shè)P_READ
                          socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ,
                                  key);
                      }
           catch (Exception x) {
                          log.error(
          "", x);
                      }

                  }
           else {
                      
          // 這里應(yīng)該是對(duì)comet進(jìn)行支持的,暫時(shí)先不看

                      ......

                  }
          // end if
              }
          // run

          第二個(gè)是#processKey()方法,里邊的很多流程我現(xiàn)在不是很關(guān)心,都略去了,

              protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
                  
          boolean result = true;
                  
          try {
                      
          if (close) {
                          cancelledKey(sk, SocketStatus.STOP, 
          false);
                      }
           else if (sk.isValid() && attachment != null{
                          attachment.access();
          // make sure we don't time out valid sockets
                          sk.attach(attachment);// cant remember why this is here
                          NioChannel channel = attachment.getChannel();
                          
          if (sk.isReadable() || sk.isWritable()) {
                              
          if (attachment.getSendfileData() != null{
                                  processSendfile(sk, attachment, 
          truefalse);
                              }
           else if (attachment.getComet()) {// 這里應(yīng)該是對(duì)comet的支持
                                  ......
                              }
           else {
                                  
          // 這個(gè)分支是現(xiàn)在比較關(guān)心的
                                  if (isWorkerAvailable()) {// 這個(gè)好像還沒(méi)實(shí)現(xiàn)
                                      
          // 這個(gè)#unreg()很巧妙,防止了通道對(duì)同一個(gè)事件不斷select的問(wèn)題
                                      unreg(sk, attachment, sk.readyOps());
                                      
          boolean close = (!processSocket(channel, nulltrue));
                                      
          if (close) {
                                          cancelledKey(sk, SocketStatus.DISCONNECT, 
          false);
                                      }

                                  }
           else {
                                      result 
          = false;
                                  }

                              }

                          }

                      }
           else {
                          
          // invalid key
                          cancelledKey(sk, SocketStatus.ERROR, false);
                      }

                  }
           catch (CancelledKeyException ckx) {
                      cancelledKey(sk, SocketStatus.ERROR, 
          false);
                  }
           catch (Throwable t) {
                      ExceptionUtils.handleThrowable(t);
                      log.error(
          "", t);
                  }

                  
          return result;
              }


              
          protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
                  reg(sk, attachment, sk.interestOps() 
          & (~readyOps));
              }


              
          protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
                  sk.interestOps(intops);
                  attachment.interestOps(intops);
                  attachment.setCometOps(intops);
              }

          這里的#unreg()方法據(jù)我理解應(yīng)該很巧妙的解決了重復(fù)的IO事件問(wèn)題,我自己寫(xiě)的測(cè)試用的NIO代碼里就會(huì)有這個(gè)問(wèn)題。

          這樣,流程就來(lái)到了Poller最后的#processSocket()方法了:

              public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
                  
          try {
                      KeyAttachment attachment 
          = (KeyAttachment) socket.getAttachment(false);
                      attachment.setCometNotify(
          false); // will get reset upon next reg
                      
          // 使用SocketProcessor
                      SocketProcessor sc = processorCache.poll();
                      
          if (sc == null)
                          sc 
          = new SocketProcessor(socket, status);
                      
          else
                          sc.reset(socket, status);
                      
          if (dispatch && getExecutor() != null)// 如果配置了ThreadPoolExecutor,那么使用它來(lái)執(zhí)行
                          getExecutor().execute(sc);
                      
          else
                          sc.run();
                  }
           catch (RejectedExecutionException rx) {
                      log.warn(
          "Socket processing request was rejected for:" + socket, rx);
                      
          return false;
                  }
           catch (Throwable t) {
                      ExceptionUtils.handleThrowable(t);
                      
          // This means we got an OOM or similar creating a thread, or that
                      
          // the pool and its queue are full
                      log.error(sm.getString("endpoint.process.fail"), t);
                      
          return false;
                  }

                  
          return true;
              }

          這里SocketProcessor的#run()方法就不列出了,里邊最后會(huì)通過(guò)下面的語(yǔ)句將流程轉(zhuǎn)到Http11NioProtocol類,其中的handler就是對(duì)Http11NioProtocol的引用:

                  SocketState state = SocketState.OPEN;
                  state 
          = (status==null)?handler.process(socket):handler.event(socket,status);

          最后,對(duì)Acceptor和Poller的處理過(guò)程做個(gè)小結(jié),見(jiàn)下圖:

          posted on 2010-12-08 08:48 臭美 閱讀(2502) 評(píng)論(0)  編輯  收藏 所屬分類: Tomcat

          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 喀喇沁旗| 根河市| 瓮安县| 大洼县| 湖口县| 永城市| 新乐市| 应用必备| 皮山县| 南溪县| 黄大仙区| 奉新县| 田林县| 荃湾区| 沧州市| 宁远县| 离岛区| 威宁| 彰武县| 贵港市| 理塘县| 康马县| 肃宁县| 陵川县| 万全县| 壶关县| 昂仁县| 巴南区| 威海市| 达拉特旗| 天津市| 芷江| 衡南县| 滦平县| 南澳县| 苏尼特右旗| 平顶山市| 昌图县| 阿拉善右旗| 鹤庆县| 漳州市|