隨筆-9  評論-3  文章-0  trackbacks-0

          接著上面的流程,現在請求到了Poller的#register()方法。

              public void register(final NioChannel socket) {
                  socket.setPoller(
          this);
                  
          // KeyAttachment是對NioChannel信息的包裝,同樣是非GC
                  KeyAttachment key = keyCache.poll();
                  
          final KeyAttachment ka = key != null ? key : new KeyAttachment(socket);
                  ka.reset(
          this, socket, getSocketProperties().getSoTimeout());
                  ka.setKeepAliveLeft(NioEndpoint.
          this.getMaxKeepAliveRequests());
                  
                  
          // PollerEvent的初始化,非GC Again
                  PollerEvent r = eventCache.poll();
                  
          // this is what OP_REGISTER turns into.
                  
          // 讀取數據的事件
                  ka.interestOps(SelectionKey.OP_READ);
                  
          if (r == null)
                      r 
          = new PollerEvent(socket, ka, OP_REGISTER);
                  
          else
                      r.reset(socket, ka, OP_REGISTER);
                  
                  
          // 把事件加到Poller
                  addEvent(r);
              }


              
          public void addEvent(Runnable event) {
                  
          // 把事件加入到隊列中
                  events.offer(event);
                  
          // ++wakeupCounter
                  if (wakeupCounter.incrementAndGet() == 0) selector.wakeup();
              }

          其實也挺好懂的,就是把NioChannel作為OP_REGISTER事件注冊到Poller,這樣在Poller的#run()方法中就可以對加入Poller的事件進行處理了。

              public void run() {
                  
          while (running) {
                      
          try {
                          
          while (paused && (!close)) {
                              
          try {
                                  Thread.sleep(
          100);
                              }
           catch (InterruptedException e) {
                                  
          // Ignore
                              }

                          }

                          
          boolean hasEvents = false;

                          hasEvents 
          = (hasEvents | events());
                          
          // Time to terminate?
                          if (close) {
                              timeout(
          0false);
                              
          break;
                          }

                          
          try {
                              
          if (!close) {
                                  
          if (wakeupCounter.get() > 0{
                                      
          // 立刻返回 I/O 就緒的那些通道的鍵
                                      keyCount = selector.selectNow();
                                  }
           else {
                                      keyCount 
          = selector.keys().size();
                                      
          // 這里把wakeupCounter設成-1,在addEvent的時候就會喚醒selector
                                      wakeupCounter.set(-1);
                                      
          // 使用阻塞的方式
                                      keyCount = selector.select(selectorTimeout);
                                  }

                                  wakeupCounter.set(
          0);
                              }

                              
          if (close) {
                                  timeout(
          0false);
                                  selector.close();
                                  
          break;
                              }

                          }
           catch (NullPointerException x) {
                              
          // sun bug 5076772 on windows JDK 1.5
                              if (log.isDebugEnabled())
                                  log.debug(
          "Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
                              
          if (wakeupCounter == null || selector == null)
                                  
          throw x;
                              
          continue;
                          }
           catch (CancelledKeyException x) {
                              
          // sun bug 5076772 on windows JDK 1.5
                              if (log.isDebugEnabled())
                                  log.debug(
          "Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
                              
          if (wakeupCounter == null || selector == null)
                                  
          throw x;
                              
          continue;
                          }
           catch (Throwable x) {
                              ExceptionUtils.handleThrowable(x);
                              log.error(
          "", x);
                              
          continue;
                          }

                          
          // either we timed out or we woke up, process events first
                          if (keyCount == 0)
                              hasEvents 
          = (hasEvents | events());

                          Iterator
          <SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator()
                                  : 
          null;
                          
          // Walk through the collection of ready keys and dispatch
                          
          // any active event.
                          while (iterator != null && iterator.hasNext()) {
                              SelectionKey sk 
          = iterator.next();
                              
          // 這里的KeyAttachment實在#register()方法中注冊的
                              KeyAttachment attachment = (KeyAttachment) sk.attachment();
                              attachment.access();
                              iterator.remove();
                              
          // 繼續流程
                              processKey(sk, attachment);
                          }
          // while

                          
          // process timeouts
                          timeout(keyCount, hasEvents);
                          
          if (oomParachute > 0 && oomParachuteData == null)
                              checkParachute();
                      }
           catch (OutOfMemoryError oom) {
                          
          try {
                              oomParachuteData 
          = null;
                              releaseCaches();
                              log.error(
          "", oom);
                          }
           catch (Throwable oomt) {
                              
          try {
                                  System.err.println(oomParachuteMsg);
                                  oomt.printStackTrace();
                              }
           catch (Throwable letsHopeWeDontGetHere) {
                                  ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                              }

                          }

                      }

                  }
          // while
                  synchronized (this{
                      
          this.notifyAll();
                  }

                  stopLatch.countDown();

              }

          這個方法有2個方法需要關注一下:#events()和#processKey():

              public boolean events() {
                  
          boolean result = false;
                  
          // synchronized (events) {
                  Runnable r = null;
                  
          // 返回是事件隊列中是否有事件
                  result = (events.size() > 0);
                  
          while ((r = events.poll()) != null{
                      
          try {
                          
          // 執行KeyEvent的#run()
                          r.run();
                          
          if (r instanceof PollerEvent) {
                              ((PollerEvent) r).reset();
                              
          // 對KeyEvent進行回收
                              eventCache.offer((PollerEvent) r);
                          }

                      }
           catch (Throwable x) {
                          log.error(
          "", x);
                      }

                  }

                  
          // events.clear();
                  
          // }
                  return result;
              }

          這里執行了SocketChannel對應的KeyEvent的#run()方法,在這個方法里給SocketChannel注冊了OP_READ:

              public void run() {
                  
          if (interestOps == OP_REGISTER) {
                      
          try {
                          
          // 給SocketChannel注冊OP_READ
                          socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ,
                                  key);
                      }
           catch (Exception x) {
                          log.error(
          "", x);
                      }

                  }
           else {
                      
          // 這里應該是對comet進行支持的,暫時先不看

                      ......

                  }
          // end if
              }
          // run

          第二個是#processKey()方法,里邊的很多流程我現在不是很關心,都略去了,

              protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
                  
          boolean result = true;
                  
          try {
                      
          if (close) {
                          cancelledKey(sk, SocketStatus.STOP, 
          false);
                      }
           else if (sk.isValid() && attachment != null{
                          attachment.access();
          // make sure we don't time out valid sockets
                          sk.attach(attachment);// cant remember why this is here
                          NioChannel channel = attachment.getChannel();
                          
          if (sk.isReadable() || sk.isWritable()) {
                              
          if (attachment.getSendfileData() != null{
                                  processSendfile(sk, attachment, 
          truefalse);
                              }
           else if (attachment.getComet()) {// 這里應該是對comet的支持
                                  ......
                              }
           else {
                                  
          // 這個分支是現在比較關心的
                                  if (isWorkerAvailable()) {// 這個好像還沒實現
                                      
          // 這個#unreg()很巧妙,防止了通道對同一個事件不斷select的問題
                                      unreg(sk, attachment, sk.readyOps());
                                      
          boolean close = (!processSocket(channel, nulltrue));
                                      
          if (close) {
                                          cancelledKey(sk, SocketStatus.DISCONNECT, 
          false);
                                      }

                                  }
           else {
                                      result 
          = false;
                                  }

                              }

                          }

                      }
           else {
                          
          // invalid key
                          cancelledKey(sk, SocketStatus.ERROR, false);
                      }

                  }
           catch (CancelledKeyException ckx) {
                      cancelledKey(sk, SocketStatus.ERROR, 
          false);
                  }
           catch (Throwable t) {
                      ExceptionUtils.handleThrowable(t);
                      log.error(
          "", t);
                  }

                  
          return result;
              }


              
          protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
                  reg(sk, attachment, sk.interestOps() 
          & (~readyOps));
              }


              
          protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
                  sk.interestOps(intops);
                  attachment.interestOps(intops);
                  attachment.setCometOps(intops);
              }

          這里的#unreg()方法據我理解應該很巧妙的解決了重復的IO事件問題,我自己寫的測試用的NIO代碼里就會有這個問題。

          這樣,流程就來到了Poller最后的#processSocket()方法了:

              public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
                  
          try {
                      KeyAttachment attachment 
          = (KeyAttachment) socket.getAttachment(false);
                      attachment.setCometNotify(
          false); // will get reset upon next reg
                      
          // 使用SocketProcessor
                      SocketProcessor sc = processorCache.poll();
                      
          if (sc == null)
                          sc 
          = new SocketProcessor(socket, status);
                      
          else
                          sc.reset(socket, status);
                      
          if (dispatch && getExecutor() != null)// 如果配置了ThreadPoolExecutor,那么使用它來執行
                          getExecutor().execute(sc);
                      
          else
                          sc.run();
                  }
           catch (RejectedExecutionException rx) {
                      log.warn(
          "Socket processing request was rejected for:" + socket, rx);
                      
          return false;
                  }
           catch (Throwable t) {
                      ExceptionUtils.handleThrowable(t);
                      
          // This means we got an OOM or similar creating a thread, or that
                      
          // the pool and its queue are full
                      log.error(sm.getString("endpoint.process.fail"), t);
                      
          return false;
                  }

                  
          return true;
              }

          這里SocketProcessor的#run()方法就不列出了,里邊最后會通過下面的語句將流程轉到Http11NioProtocol類,其中的handler就是對Http11NioProtocol的引用:

                  SocketState state = SocketState.OPEN;
                  state 
          = (status==null)?handler.process(socket):handler.event(socket,status);

          最后,對Acceptor和Poller的處理過程做個小結,見下圖:

          posted on 2010-12-08 08:48 臭美 閱讀(2502) 評論(0)  編輯  收藏 所屬分類: Tomcat
          主站蜘蛛池模板: 伊通| 民和| 友谊县| 资中县| 泸溪县| 阜康市| 祁连县| 西华县| 边坝县| 太仓市| 湖北省| 湘潭县| 新竹市| 峨山| 伊金霍洛旗| 商水县| 孝感市| 长兴县| 肥乡县| 佛坪县| 嵊州市| 财经| 丹凤县| 泗阳县| 吉木乃县| 商洛市| 桓台县| 洱源县| 叙永县| 宁城县| 赫章县| 鲜城| 汝阳县| 蒲城县| 会昌县| 饶平县| 都江堰市| 镇安县| 外汇| 怀来县| 永和县|