java io以及unix io模型
IO分兩個階段:1.通知內核準備數據。2.數據從內核緩沖區拷貝到應用緩沖區
根據這2點IO類型可以分成:
1.阻塞IO,在兩個階段上面都是阻塞的。
2.非阻塞IO,在第1階段,程序不斷的輪詢直到數據準備好,第2階段還是阻塞的
3.IO復用,在第1階段,當一個或者多個IO準備就緒時,通知程序,第2階段還是阻塞的,在第1階段還是輪詢實現的,只是
所有的IO都集中在一個地方,這個地方進行輪詢
4.信號IO,當數據準備完畢的時候,信號通知程序數據準備完畢,第2階段阻塞
5.異步IO,1,2都不阻塞,windows的iocp是真正的異步IO
Java NIO
java NIO在linux上面是用epoll實現的,屬于IO復用類型。
Selector:IO的多路復用器,通道需要向其注冊,在數據準備階段由他進行狀態的輪詢
看關鍵點1的代碼:SelectionKey:通道向selector注冊后會創建一個SelectionKey,SelectionKey維系通道和selector的關系.SelectionKey包含兩個整數集一個為interest集合,一個為ready集合.interest集合指定Selector需要監聽的事件.ready集合為Selector為SelectorKey監聽后已經準備就緒的可以進行操作的事件.ready集合特別需要注意,這個里面可能有阻塞的行為,如OP_READ事件,只是暗示可讀,但是真正的數據此時還沒有到來,此時就會阻塞了。
epoll有三個方法epoll_create(),epoll_ctl(),epoll_wait():
int epoll_create(int size)
創建epoll文件,用于存放epoll_ctl注冊的event。
關鍵點1.register()方法中有同步語句,可能當前線程就阻塞了,線程切換會有性能的損耗。
關鍵點2.fdToKey是個key為fd,value為selectionKey的map
關鍵點3.Updator是個內部類:
在SelectionImpl.register()中最后還要執行SelectionKeyImpl.interestOps()方法注冊操作事件(前面添加的時候操作事件是為0的)
至此register()動作已經完成。
select()

代碼如下:
epoll有三個方法epoll_create(),epoll_ctl(),epoll_wait():
int epoll_create(int size)
創建epoll文件,用于存放epoll_ctl注冊的event。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
注冊fd到epfd中,op為操作數(add,mod,delete) ,epoll_event為注冊感興趣的事件,這個里面也注冊了回調函數,當對應的fd的設備ready時,就調用回調函數,將這個fd加入epfd的ready set當中,epoll_wait()一直就在那里等待ready set。
JDK中向Selector注冊事件流程(以linux epoll為例):

關鍵代碼如下:注冊fd到epfd中,op為操作數(add,mod,delete) ,epoll_event為注冊感興趣的事件,這個里面也注冊了回調函數,當對應的fd的設備ready時,就調用回調函數,將這個fd加入epfd的ready set當中,epoll_wait()一直就在那里等待ready set。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
等待ready set,當準備好有數據的時候返回數據的個數,epoll_event為感興趣的事件集合,maxevents為事件集合的個數。
等待ready set,當準備好有數據的時候返回數據的個數,epoll_event為感興趣的事件集合,maxevents為事件集合的個數。
JDK中向Selector注冊事件流程(以linux epoll為例):
AbstractSelectableChannel.register():
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
if (!isOpen()) //channel關閉拋異常
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)//不合理的注冊值,拋異常
throw new IllegalArgumentException();
synchronized (regLock) {//有鎖就有可能有線程的阻塞和切換 關鍵點1
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel); //查看原來有沒有注冊過
if (k != null) {//注冊過直接設置后返回
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);//沒有注冊的話執行selector的register
addKey(k);
}
return k;
}
}
SelectionImpl.register():
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {//又是鎖,可能阻塞或者線程切換
implRegister(k);
}
k.interestOps(ops);
return k;
}
EPollSelectorImpl.implRegister():
protected void implRegister(SelectionKeyImpl ski) {
SelChImpl ch = ski.channel;
fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);//關鍵點2 fd和Selectionkey對應的map
pollWrapper.add(ch);
keys.add(ski);
}
EPollArrayWrapper.add():
void add(SelChImpl channel) {
synchronized (updateList) {
updateList.add(new Updator(channel, EPOLL_CTL_ADD));//關鍵點3
}
}
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
if (!isOpen()) //channel關閉拋異常
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)//不合理的注冊值,拋異常
throw new IllegalArgumentException();
synchronized (regLock) {//有鎖就有可能有線程的阻塞和切換 關鍵點1
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel); //查看原來有沒有注冊過
if (k != null) {//注冊過直接設置后返回
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);//沒有注冊的話執行selector的register
addKey(k);
}
return k;
}
}
SelectionImpl.register():
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {//又是鎖,可能阻塞或者線程切換
implRegister(k);
}
k.interestOps(ops);
return k;
}
EPollSelectorImpl.implRegister():
protected void implRegister(SelectionKeyImpl ski) {
SelChImpl ch = ski.channel;
fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);//關鍵點2 fd和Selectionkey對應的map
pollWrapper.add(ch);
keys.add(ski);
}
EPollArrayWrapper.add():
void add(SelChImpl channel) {
synchronized (updateList) {
updateList.add(new Updator(channel, EPOLL_CTL_ADD));//關鍵點3
}
}
關鍵點1.register()方法中有同步語句,可能當前線程就阻塞了,線程切換會有性能的損耗。
關鍵點2.fdToKey是個key為fd,value為selectionKey的map
關鍵點3.Updator是個內部類:
private static class Updator {
SelChImpl channel;
int opcode;
int events;
Updator(SelChImpl channel, int opcode, int events) {
this.channel = channel;
this.opcode = opcode;
this.events = events;
}
Updator(SelChImpl channel, int opcode) {
this(channel, opcode, 0);
}
}
代表channel以及對channel的操作類型以及操作的事件,新注冊的操作類型為add,操作事件為0,代表沒有事件。SelChImpl channel;
int opcode;
int events;
Updator(SelChImpl channel, int opcode, int events) {
this.channel = channel;
this.opcode = opcode;
this.events = events;
}
Updator(SelChImpl channel, int opcode) {
this(channel, opcode, 0);
}
}
在SelectionImpl.register()中最后還要執行SelectionKeyImpl.interestOps()方法注冊操作事件(前面添加的時候操作事件是為0的)
SelectionKeyImpl:
public SelectionKey interestOps(int ops) {
ensureValid();//檢測是否可用
return nioInterestOps(ops);
}
SelectionKey nioInterestOps(int ops) { // package-private
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
//真正的進行epoll感興趣事件的注冊
channel.translateAndSetInterestOps(ops, this);
interestOps = ops;
return this;
}
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
//進行java的ops和epoll的ops轉換,每個具體的channel是有區別的
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= PollArrayWrapper.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= PollArrayWrapper.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= PollArrayWrapper.POLLCONN;
sk.selector.putEventOps(sk, newOps);
}
void setInterest(SelChImpl channel, int mask) {
synchronized (updateList) {
// if the previous pending operation is to add this file descriptor
// to epoll then update its event set
if (updateList.size() > 0) {//關鍵點1
Updator last = updateList.getLast();
//這個肯定是剛才那個注冊的channel,直接進行事件的更新
if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
last.events = mask;
return;
}
}
// update existing registration 程序運行到這里的話,說明前面已經有更新的updator加入了,這里只好新加入一個
updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
}
}
關鍵點1.這里是考慮到并發的問題了,但是我有個疑問為什么注冊要分兩個步驟執行,為什么不直接在EPollSelectorImpl.implRegister()加入 updateList.add(new Updator(channel, EPOLL_CTL_ADD, mask))呢?public SelectionKey interestOps(int ops) {
ensureValid();//檢測是否可用
return nioInterestOps(ops);
}
SelectionKey nioInterestOps(int ops) { // package-private
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
//真正的進行epoll感興趣事件的注冊
channel.translateAndSetInterestOps(ops, this);
interestOps = ops;
return this;
}
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
//進行java的ops和epoll的ops轉換,每個具體的channel是有區別的
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= PollArrayWrapper.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= PollArrayWrapper.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= PollArrayWrapper.POLLCONN;
sk.selector.putEventOps(sk, newOps);
}
void setInterest(SelChImpl channel, int mask) {
synchronized (updateList) {
// if the previous pending operation is to add this file descriptor
// to epoll then update its event set
if (updateList.size() > 0) {//關鍵點1
Updator last = updateList.getLast();
//這個肯定是剛才那個注冊的channel,直接進行事件的更新
if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
last.events = mask;
return;
}
}
// update existing registration 程序運行到這里的話,說明前面已經有更新的updator加入了,這里只好新加入一個
updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
}
}
至此register()動作已經完成。
select()
代碼如下:
public int select(long timeout)
throws IOException
{
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
if (!isOpen())
throw new ClosedSelectorException();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
}
protected int doSelect(long timeout)
throws IOException
{
if (closed)
throw new ClosedSelectorException();
//反注冊過程 刪除取消的通道的key(interest keys,ready keys(也就是selected keys),channel的key set)
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);//關鍵點1
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
//遍歷取消的key
Set cks = cancelledKeys();
synchronized (cks) {
Iterator i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
//底層的取消實現
implDereg(ski);
} catch (SocketException se) {
IOException ioe = new IOException(
"Error deregistering key");
ioe.initCause(se);
throw ioe;
} finally {
//刪除取消的key
i.remove();
}
}
}
}
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0);
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
fdToKey.remove(new Integer(fd));//hashmap中去除
pollWrapper.release(ch);//這步很關鍵
ski.setIndex(-1);
keys.remove(ski);//總的key set去除
selectedKeys.remove(ski);//已經準備好的set去除
deregister((AbstractSelectionKey)ski);//移除channel的集合
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
void release(SelChImpl channel) {
//空閑隊列刪除了
synchronized (updateList) {
// flush any pending updates
for (Iterator<Updator> it = updateList.iterator(); it.hasNext();) {
if (it.next().channel == channel) {
it.remove();
}
}
// remove from the idle set (if present)
idleSet.remove(channel);
//調用native 通知本channel對應的fd被被刪除
// remove from epoll (if registered)
epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
}
}
throws IOException
{
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
if (!isOpen())
throw new ClosedSelectorException();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
}
protected int doSelect(long timeout)
throws IOException
{
if (closed)
throw new ClosedSelectorException();
//反注冊過程 刪除取消的通道的key(interest keys,ready keys(也就是selected keys),channel的key set)
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);//關鍵點1
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
//遍歷取消的key
Set cks = cancelledKeys();
synchronized (cks) {
Iterator i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
//底層的取消實現
implDereg(ski);
} catch (SocketException se) {
IOException ioe = new IOException(
"Error deregistering key");
ioe.initCause(se);
throw ioe;
} finally {
//刪除取消的key
i.remove();
}
}
}
}
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0);
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
fdToKey.remove(new Integer(fd));//hashmap中去除
pollWrapper.release(ch);//這步很關鍵
ski.setIndex(-1);
keys.remove(ski);//總的key set去除
selectedKeys.remove(ski);//已經準備好的set去除
deregister((AbstractSelectionKey)ski);//移除channel的集合
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
void release(SelChImpl channel) {
//空閑隊列刪除了
synchronized (updateList) {
// flush any pending updates
for (Iterator<Updator> it = updateList.iterator(); it.hasNext();) {
if (it.next().channel == channel) {
it.remove();
}
}
// remove from the idle set (if present)
idleSet.remove(channel);
//調用native 通知本channel對應的fd被被刪除
// remove from epoll (if registered)
epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
}
}
int poll(long timeout) throws IOException {
updateRegistrations();//在poll前 先把update里面不需要的條目處理掉
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
void updateRegistrations() {
synchronized (updateList) {
Updator u = null;
while ((u = updateList.poll()) != null) {
SelChImpl ch = u.channel;
if (!ch.isOpen())
continue;
// if the events are 0 then file descriptor is put into "idle
// set" to prevent it being polled
if (u.events == 0) {//這個表示interest事件為0
boolean added = idleSet.add(u.channel); //關鍵點1
//先加入到idleSet里面 如果是這次加入的 而且操作行為是mod,那么就是個刪除這個channel對應的fd
// if added to idle set then remove from epoll if registered
if (added && (u.opcode == EPOLL_CTL_MOD))
epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
} else {
//關鍵點 2
// events are specified. If file descriptor was in idle set
// it must be re-registered (by converting opcode to ADD)
boolean idle = false;
//如果idleSet不為空而且有這個Updator 說明關鍵點1處代碼返回true,操作行為為add,mod的話在epollCtl會被刪除掉
if (!idleSet.isEmpty())
idle = idleSet.remove(u.channel);
int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
epollCtl(epfd, opcode, ch.getFDVal(), u.events);
}
}
}
}
updateRegistrations();//在poll前 先把update里面不需要的條目處理掉
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
void updateRegistrations() {
synchronized (updateList) {
Updator u = null;
while ((u = updateList.poll()) != null) {
SelChImpl ch = u.channel;
if (!ch.isOpen())
continue;
// if the events are 0 then file descriptor is put into "idle
// set" to prevent it being polled
if (u.events == 0) {//這個表示interest事件為0
boolean added = idleSet.add(u.channel); //關鍵點1
//先加入到idleSet里面 如果是這次加入的 而且操作行為是mod,那么就是個刪除這個channel對應的fd
// if added to idle set then remove from epoll if registered
if (added && (u.opcode == EPOLL_CTL_MOD))
epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
} else {
//關鍵點 2
// events are specified. If file descriptor was in idle set
// it must be re-registered (by converting opcode to ADD)
boolean idle = false;
//如果idleSet不為空而且有這個Updator 說明關鍵點1處代碼返回true,操作行為為add,mod的話在epollCtl會被刪除掉
if (!idleSet.isEmpty())
idle = idleSet.remove(u.channel);
int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
epollCtl(epfd, opcode, ch.getFDVal(), u.events);
}
}
}
}
updateList是在EPollArrayWrapper.setInterest()和add()方法中添加的,當有多個線程的時候,通過channel register()有可能剛加入的Updator會被updateRegistrations()得到,得到的就是Channel第一次register的updatot,這個時候events為0,被加入到idleSet接著setInterest()被調用(channel register()最后一步),多了一個updator,這個時候再執行Selector.select(),顯然會到關鍵點2操作行為add,是沒有問題的。執行完updateRegistrations()方法,然后就epollWait()方法的調用,這個就是epoll的native方法了
總結:
1.Channel的register方法最后加入channel的感興趣的事件到updatorList中
2.Selector的select的方法主要是對updatorList進行運作,首先去除所有cancelkey(),也就刪除了對應的底層的updatorList的條目,然后迭代updatorList根據updator的event事件進行處理,也就是執行epoll的epollCtl方法,之后就是執行epollWait等待epollCtl的channel對應的callback函數的執行了。
總結:
1.Channel的register方法最后加入channel的感興趣的事件到updatorList中
2.Selector的select的方法主要是對updatorList進行運作,首先去除所有cancelkey(),也就刪除了對應的底層的updatorList的條目,然后迭代updatorList根據updator的event事件進行處理,也就是執行epoll的epollCtl方法,之后就是執行epollWait等待epollCtl的channel對應的callback函數的執行了。