接著上面的流程,現(xiàn)在請(qǐng)求到了Poller的#register()方法。

public void register(final NioChannel socket) {
socket.setPoller(this);
// KeyAttachment是對(duì)NioChannel信息的包裝,同樣是非GC
KeyAttachment key = keyCache.poll();
final KeyAttachment ka = key != null ? key : new KeyAttachment(socket);
ka.reset(this, socket, getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
// PollerEvent的初始化,非GC Again
PollerEvent r = eventCache.poll();
// this is what OP_REGISTER turns into.
// 讀取數(shù)據(jù)的事件
ka.interestOps(SelectionKey.OP_READ);
if (r == null)
r = new PollerEvent(socket, ka, OP_REGISTER);
else
r.reset(socket, ka, OP_REGISTER);
// 把事件加到Poller
addEvent(r);
}


public void addEvent(Runnable event) {
// 把事件加入到隊(duì)列中
events.offer(event);
// ++wakeupCounter
if (wakeupCounter.incrementAndGet() == 0) selector.wakeup();
}
其實(shí)也挺好懂的,就是把NioChannel作為OP_REGISTER事件注冊(cè)到Poller,這樣在Poller的#run()方法中就可以對(duì)加入Poller的事件進(jìn)行處理了。

public void run() {

while (running) {

try {

while (paused && (!close)) {

try {
Thread.sleep(100);

} catch (InterruptedException e) {
// Ignore
}
}
boolean hasEvents = false;

hasEvents = (hasEvents | events());
// Time to terminate?

if (close) {
timeout(0, false);
break;
}

try {

if (!close) {

if (wakeupCounter.get() > 0) {
// 立刻返回 I/O 就緒的那些通道的鍵
keyCount = selector.selectNow();

} else {
keyCount = selector.keys().size();
// 這里把wakeupCounter設(shè)成-1,在addEvent的時(shí)候就會(huì)喚醒selector
wakeupCounter.set(-1);
// 使用阻塞的方式
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}

if (close) {
timeout(0, false);
selector.close();
break;
}

} catch (NullPointerException x) {
// sun bug 5076772 on windows JDK 1.5
if (log.isDebugEnabled())
log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
if (wakeupCounter == null || selector == null)
throw x;
continue;

} catch (CancelledKeyException x) {
// sun bug 5076772 on windows JDK 1.5
if (log.isDebugEnabled())
log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
if (wakeupCounter == null || selector == null)
throw x;
continue;

} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("", x);
continue;
}
// either we timed out or we woke up, process events first
if (keyCount == 0)
hasEvents = (hasEvents | events());

Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator()
: null;
// Walk through the collection of ready keys and dispatch
// any active event.

while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
// 這里的KeyAttachment實(shí)在#register()方法中注冊(cè)的
KeyAttachment attachment = (KeyAttachment) sk.attachment();
attachment.access();
iterator.remove();
// 繼續(xù)流程
processKey(sk, attachment);
}// while

// process timeouts
timeout(keyCount, hasEvents);
if (oomParachute > 0 && oomParachuteData == null)
checkParachute();

} catch (OutOfMemoryError oom) {

try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);

} catch (Throwable oomt) {

try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();

} catch (Throwable letsHopeWeDontGetHere) {
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
}
}// while

synchronized (this) {
this.notifyAll();
}
stopLatch.countDown();

}
這個(gè)方法有2個(gè)方法需要關(guān)注一下:#events()和#processKey():

public boolean events() {
boolean result = false;
// synchronized (events) {
Runnable r = null;
// 返回是事件隊(duì)列中是否有事件
result = (events.size() > 0);

while ((r = events.poll()) != null) {

try {
// 執(zhí)行KeyEvent的#run()
r.run();

if (r instanceof PollerEvent) {
((PollerEvent) r).reset();
// 對(duì)KeyEvent進(jìn)行回收
eventCache.offer((PollerEvent) r);
}

} catch (Throwable x) {
log.error("", x);
}
}
// events.clear();
// }
return result;
}
這里執(zhí)行了SocketChannel對(duì)應(yīng)的KeyEvent的#run()方法,在這個(gè)方法里給SocketChannel注冊(cè)了OP_READ:

public void run() {

if (interestOps == OP_REGISTER) {

try {
// 給SocketChannel注冊(cè)O(shè)P_READ
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ,
key);

} catch (Exception x) {
log.error("", x);
}

} else {
// 這里應(yīng)該是對(duì)comet進(jìn)行支持的,暫時(shí)先不看

......

}// end if
}// run
第二個(gè)是#processKey()方法,里邊的很多流程我現(xiàn)在不是很關(guān)心,都略去了,

protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
boolean result = true;

try {

if (close) {
cancelledKey(sk, SocketStatus.STOP, false);

} else if (sk.isValid() && attachment != null) {
attachment.access();// make sure we don't time out valid sockets
sk.attach(attachment);// cant remember why this is here
NioChannel channel = attachment.getChannel();

if (sk.isReadable() || sk.isWritable()) {

if (attachment.getSendfileData() != null) {
processSendfile(sk, attachment, true, false);

} else if (attachment.getComet()) {// 這里應(yīng)該是對(duì)comet的支持
......

} else {
// 這個(gè)分支是現(xiàn)在比較關(guān)心的

if (isWorkerAvailable()) {// 這個(gè)好像還沒(méi)實(shí)現(xiàn)
// 這個(gè)#unreg()很巧妙,防止了通道對(duì)同一個(gè)事件不斷select的問(wèn)題
unreg(sk, attachment, sk.readyOps());
boolean close = (!processSocket(channel, null, true));

if (close) {
cancelledKey(sk, SocketStatus.DISCONNECT, false);
}

} else {
result = false;
}
}
}

} else {
// invalid key
cancelledKey(sk, SocketStatus.ERROR, false);
}

} catch (CancelledKeyException ckx) {
cancelledKey(sk, SocketStatus.ERROR, false);

} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("", t);
}
return result;
}


protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
reg(sk, attachment, sk.interestOps() & (~readyOps));
}


protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
sk.interestOps(intops);
attachment.interestOps(intops);
attachment.setCometOps(intops);
}
這里的#unreg()方法據(jù)我理解應(yīng)該很巧妙的解決了重復(fù)的IO事件問(wèn)題,我自己寫(xiě)的測(cè)試用的NIO代碼里就會(huì)有這個(gè)問(wèn)題。
這樣,流程就來(lái)到了Poller最后的#processSocket()方法了:

public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {

try {
KeyAttachment attachment = (KeyAttachment) socket.getAttachment(false);
attachment.setCometNotify(false); // will get reset upon next reg
// 使用SocketProcessor
SocketProcessor sc = processorCache.poll();
if (sc == null)
sc = new SocketProcessor(socket, status);
else
sc.reset(socket, status);
if (dispatch && getExecutor() != null)// 如果配置了ThreadPoolExecutor,那么使用它來(lái)執(zhí)行
getExecutor().execute(sc);
else
sc.run();

} catch (RejectedExecutionException rx) {
log.warn("Socket processing request was rejected for:" + socket, rx);
return false;

} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
這里SocketProcessor的#run()方法就不列出了,里邊最后會(huì)通過(guò)下面的語(yǔ)句將流程轉(zhuǎn)到Http11NioProtocol類,其中的handler就是對(duì)Http11NioProtocol的引用:
SocketState state = SocketState.OPEN;
state = (status==null)?handler.process(socket):handler.event(socket,status);
最后,對(duì)Acceptor和Poller的處理過(guò)程做個(gè)小結(jié),見(jiàn)下圖:
posted on 2010-12-08 08:48
臭美 閱讀(2502)
評(píng)論(0) 編輯 收藏 所屬分類:
Tomcat