NIO中的Selector封裝了底層的系統調用,其中wakeup用于喚醒阻塞在select方法上的線程,它的實現很簡單,在linux上就是創建一個管道并加入poll的fd集合,wakeup就是往管道里寫一個字節,那么阻塞的poll方法有數據可讀就立即返回。證明這一點很簡單,strace即可知道:
public class SelectorTest {
public static void main(String[] args) throws Exception {
Selector selector = Selector.open();
selector.wakeup();
}
}
public static void main(String[] args) throws Exception {
Selector selector = Selector.open();
selector.wakeup();
}
}
使用strace調用,只關心write的系統調用
sudo strace -f -e write java SelectorTest
輸出:
Process 29181 attached
Process 29182 attached
Process 29183 attached
Process 29184 attached
Process 29185 attached
Process 29186 attached
Process 29187 attached
Process 29188 attached
Process 29189 attached
Process 29190 attached
Process 29191 attached
[pid 29181] write(36, "\1", 1) = 1
Process 29191 detached
Process 29184 detached
Process 29181 detached
Process 29182 attached
Process 29183 attached
Process 29184 attached
Process 29185 attached
Process 29186 attached
Process 29187 attached
Process 29188 attached
Process 29189 attached
Process 29190 attached
Process 29191 attached
[pid 29181] write(36, "\1", 1) = 1
Process 29191 detached
Process 29184 detached
Process 29181 detached
有的同學說了,怎么證明這個write是wakeup方法調用的,而不是其他方法呢,這個很好證明,我們多調用幾次:
public class SelectorTest {
public static void main(String[] args) throws Exception {
Selector selector = Selector.open();
selector.wakeup();
selector.selectNow();
selector.wakeup();
selector.selectNow();
selector.wakeup();
}
}
public static void main(String[] args) throws Exception {
Selector selector = Selector.open();
selector.wakeup();
selector.selectNow();
selector.wakeup();
selector.selectNow();
selector.wakeup();
}
}
修改程序調用三次wakeup,心細的朋友肯定注意到我們還調用了兩次selectNow,這是因為在兩次成功的select方法之間調用wakeup多次都只算做一次,為了顯示3次write,這里就每次調用前select一下將前一次寫入的字節讀到,同樣執行上面的strace調用,輸出:
Process 29303 attached
Process 29304 attached
Process 29305 attached
Process 29306 attached
Process 29307 attached
Process 29308 attached
Process 29309 attached
Process 29310 attached
Process 29311 attached
Process 29312 attached
Process 29313 attached
[pid 29303] write(36, "\1", 1) = 1
[pid 29303] write(36, "\1", 1) = 1
[pid 29303] write(36, "\1", 1) = 1
Process 29313 detached
Process 29309 detached
Process 29306 detached
Process 29303 detached
Process 29304 attached
Process 29305 attached
Process 29306 attached
Process 29307 attached
Process 29308 attached
Process 29309 attached
Process 29310 attached
Process 29311 attached
Process 29312 attached
Process 29313 attached
[pid 29303] write(36, "\1", 1) = 1
[pid 29303] write(36, "\1", 1) = 1
[pid 29303] write(36, "\1", 1) = 1
Process 29313 detached
Process 29309 detached
Process 29306 detached
Process 29303 detached
果然是3次write的系統調用,都是寫入一個字節,如果我們去掉selectNow,那么三次wakeup還是等于一次:
public class SelectorTest {
public static void main(String[] args) throws Exception {
Selector selector = Selector.open();
selector.wakeup();
selector.wakeup();
selector.wakeup();
}
}
public static void main(String[] args) throws Exception {
Selector selector = Selector.open();
selector.wakeup();
selector.wakeup();
selector.wakeup();
}
}
輸出:
Process 29331 attached
Process 29332 attached
Process 29333 attached
Process 29334 attached
Process 29335 attached
Process 29336 attached
Process 29337 attached
Process 29338 attached
Process 29339 attached
Process 29340 attached
Process 29341 attached
[pid 29331] write(36, "\1", 1) = 1
Process 29341 detached
Process 29337 detached
Process 29334 detached
Process 29331 detached
Process 29332 attached
Process 29333 attached
Process 29334 attached
Process 29335 attached
Process 29336 attached
Process 29337 attached
Process 29338 attached
Process 29339 attached
Process 29340 attached
Process 29341 attached
[pid 29331] write(36, "\1", 1) = 1
Process 29341 detached
Process 29337 detached
Process 29334 detached
Process 29331 detached
wakeup方法的API說明沒有欺騙我們。wakeup方法的API還告訴我們,如果當前Selector沒有阻塞在select方法上,那么本次wakeup調用會在下一次select阻塞的時候生效,這個道理很簡單,wakeup方法寫入一個字節,下次poll等待的時候立即發現可讀并返回,因此不會阻塞。
具體到源碼級別,在linux平臺上的wakeup方法其實調用了pipe創建了管道,wakeup調用了EPollArrayWrapper的interrupt方法:
public void interrupt()
{
interrupt(outgoingInterruptFD);
}
{
interrupt(outgoingInterruptFD);
}
實際調用的是interrupt(fd)的native方法,查看EPollArrayWrapper.c可見清晰的write系統調用:
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
int fakebuf[1];
fakebuf[0] = 1;
if (write(fd, fakebuf, 1) < 0) {
JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
}
}