接著上面的流程,現在請求到了Poller的#register()方法。

public void register(final NioChannel socket) {
socket.setPoller(this);
// KeyAttachment是對NioChannel信息的包裝,同樣是非GC
KeyAttachment key = keyCache.poll();
final KeyAttachment ka = key != null ? key : new KeyAttachment(socket);
ka.reset(this, socket, getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
// PollerEvent的初始化,非GC Again
PollerEvent r = eventCache.poll();
// this is what OP_REGISTER turns into.
// 讀取數據的事件
ka.interestOps(SelectionKey.OP_READ);
if (r == null)
r = new PollerEvent(socket, ka, OP_REGISTER);
else
r.reset(socket, ka, OP_REGISTER);
// 把事件加到Poller
addEvent(r);
}


public void addEvent(Runnable event) {
// 把事件加入到隊列中
events.offer(event);
// ++wakeupCounter
if (wakeupCounter.incrementAndGet() == 0) selector.wakeup();
}
其實也挺好懂的,就是把NioChannel作為OP_REGISTER事件注冊到Poller,這樣在Poller的#run()方法中就可以對加入Poller的事件進行處理了。

public void run() {

while (running) {

try {

while (paused && (!close)) {

try {
Thread.sleep(100);

} catch (InterruptedException e) {
// Ignore
}
}
boolean hasEvents = false;

hasEvents = (hasEvents | events());
// Time to terminate?

if (close) {
timeout(0, false);
break;
}

try {

if (!close) {

if (wakeupCounter.get() > 0) {
// 立刻返回 I/O 就緒的那些通道的鍵
keyCount = selector.selectNow();

} else {
keyCount = selector.keys().size();
// 這里把wakeupCounter設成-1,在addEvent的時候就會喚醒selector
wakeupCounter.set(-1);
// 使用阻塞的方式
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}

if (close) {
timeout(0, false);
selector.close();
break;
}

} catch (NullPointerException x) {
// sun bug 5076772 on windows JDK 1.5
if (log.isDebugEnabled())
log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
if (wakeupCounter == null || selector == null)
throw x;
continue;

} catch (CancelledKeyException x) {
// sun bug 5076772 on windows JDK 1.5
if (log.isDebugEnabled())
log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
if (wakeupCounter == null || selector == null)
throw x;
continue;

} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("", x);
continue;
}
// either we timed out or we woke up, process events first
if (keyCount == 0)
hasEvents = (hasEvents | events());

Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator()
: null;
// Walk through the collection of ready keys and dispatch
// any active event.

while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
// 這里的KeyAttachment實在#register()方法中注冊的
KeyAttachment attachment = (KeyAttachment) sk.attachment();
attachment.access();
iterator.remove();
// 繼續流程
processKey(sk, attachment);
}// while

// process timeouts
timeout(keyCount, hasEvents);
if (oomParachute > 0 && oomParachuteData == null)
checkParachute();

} catch (OutOfMemoryError oom) {

try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);

} catch (Throwable oomt) {

try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();

} catch (Throwable letsHopeWeDontGetHere) {
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
}
}// while

synchronized (this) {
this.notifyAll();
}
stopLatch.countDown();

}
這個方法有2個方法需要關注一下:#events()和#processKey():

public boolean events() {
boolean result = false;
// synchronized (events) {
Runnable r = null;
// 返回是事件隊列中是否有事件
result = (events.size() > 0);

while ((r = events.poll()) != null) {

try {
// 執行KeyEvent的#run()
r.run();

if (r instanceof PollerEvent) {
((PollerEvent) r).reset();
// 對KeyEvent進行回收
eventCache.offer((PollerEvent) r);
}

} catch (Throwable x) {
log.error("", x);
}
}
// events.clear();
// }
return result;
}
這里執行了SocketChannel對應的KeyEvent的#run()方法,在這個方法里給SocketChannel注冊了OP_READ:

public void run() {

if (interestOps == OP_REGISTER) {

try {
// 給SocketChannel注冊OP_READ
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ,
key);

} catch (Exception x) {
log.error("", x);
}

} else {
// 這里應該是對comet進行支持的,暫時先不看

......

}// end if
}// run
第二個是#processKey()方法,里邊的很多流程我現在不是很關心,都略去了,

protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
boolean result = true;

try {

if (close) {
cancelledKey(sk, SocketStatus.STOP, false);

} else if (sk.isValid() && attachment != null) {
attachment.access();// make sure we don't time out valid sockets
sk.attach(attachment);// cant remember why this is here
NioChannel channel = attachment.getChannel();

if (sk.isReadable() || sk.isWritable()) {

if (attachment.getSendfileData() != null) {
processSendfile(sk, attachment, true, false);

} else if (attachment.getComet()) {// 這里應該是對comet的支持
......

} else {
// 這個分支是現在比較關心的

if (isWorkerAvailable()) {// 這個好像還沒實現
// 這個#unreg()很巧妙,防止了通道對同一個事件不斷select的問題
unreg(sk, attachment, sk.readyOps());
boolean close = (!processSocket(channel, null, true));

if (close) {
cancelledKey(sk, SocketStatus.DISCONNECT, false);
}

} else {
result = false;
}
}
}

} else {
// invalid key
cancelledKey(sk, SocketStatus.ERROR, false);
}

} catch (CancelledKeyException ckx) {
cancelledKey(sk, SocketStatus.ERROR, false);

} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("", t);
}
return result;
}


protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
reg(sk, attachment, sk.interestOps() & (~readyOps));
}


protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
sk.interestOps(intops);
attachment.interestOps(intops);
attachment.setCometOps(intops);
}
這里的#unreg()方法據我理解應該很巧妙的解決了重復的IO事件問題,我自己寫的測試用的NIO代碼里就會有這個問題。
這樣,流程就來到了Poller最后的#processSocket()方法了:

public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {

try {
KeyAttachment attachment = (KeyAttachment) socket.getAttachment(false);
attachment.setCometNotify(false); // will get reset upon next reg
// 使用SocketProcessor
SocketProcessor sc = processorCache.poll();
if (sc == null)
sc = new SocketProcessor(socket, status);
else
sc.reset(socket, status);
if (dispatch && getExecutor() != null)// 如果配置了ThreadPoolExecutor,那么使用它來執行
getExecutor().execute(sc);
else
sc.run();

} catch (RejectedExecutionException rx) {
log.warn("Socket processing request was rejected for:" + socket, rx);
return false;

} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
這里SocketProcessor的#run()方法就不列出了,里邊最后會通過下面的語句將流程轉到Http11NioProtocol類,其中的handler就是對Http11NioProtocol的引用:
SocketState state = SocketState.OPEN;
state = (status==null)?handler.process(socket):handler.event(socket,status);
最后,對Acceptor和Poller的處理過程做個小結,見下圖:
posted on 2010-12-08 08:48
臭美 閱讀(2502)
評論(0) 編輯 收藏 所屬分類:
Tomcat