java io以及unix io模型

          IO分兩個階段:
          1.通知內核準備數據。2.數據從內核緩沖區拷貝到應用緩沖區

          根據這2點IO類型可以分成:
          1.阻塞IO,在兩個階段上面都是阻塞的。
          2.非阻塞IO,在第1階段,程序不斷的輪詢直到數據準備好,第2階段還是阻塞的
          3.IO復用,在第1階段,當一個或者多個IO準備就緒時,通知程序,第2階段還是阻塞的,在第1階段還是輪詢實現的,只是
          所有的IO都集中在一個地方,這個地方進行輪詢
          4.信號IO,當數據準備完畢的時候,信號通知程序數據準備完畢,第2階段阻塞
          5.異步IO,1,2都不阻塞,windows的iocp是真正的異步IO


          Java NIO
          java NIO在linux上面是用epoll實現的,屬于IO復用類型。
          Selector:IO的多路復用器,通道需要向其注冊,在數據準備階段由他進行狀態的輪詢
          SelectionKey:通道向selector注冊后會創建一個SelectionKey,SelectionKey維系通道和selector的關系.SelectionKey包含兩個整數集一個為interest集合,一個為ready集合.interest集合指定Selector需要監聽的事件.ready集合為Selector為SelectorKey監聽后已經準備就緒的可以進行操作的事件.ready集合特別需要注意,這個里面可能有阻塞的行為,如OP_READ事件,只是暗示可讀,但是真正的數據此時還沒有到來,此時就會阻塞了。

          epoll有三個方法epoll_create(),epoll_ctl(),epoll_wait():
          int epoll_create(int size)
                  創建epoll文件,用于存放epoll_ctl注冊的event。
          int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
                  注冊fd到epfd中,op為操作數(add,mod,delete) ,epoll_event為注冊感興趣的事件,這個里面也注冊了回調函數,當對應的fd的設備ready時,就調用回調函數,將這個fd加入epfd的ready set當中,epoll_wait()一直就在那里等待ready set。
          int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
                  等待ready set,當準備好有數據的時候返回數據的個數,epoll_event為感興趣的事件集合,maxevents為事件集合的個數。

          JDK中向Selector注冊事件流程(以linux epoll為例):
          關鍵代碼如下:
          AbstractSelectableChannel.register():
              
          public final SelectionKey register(Selector sel, int ops,
                                                 Object att)
                  
          throws ClosedChannelException
              {
                  
          if (!isOpen()) //channel關閉拋異常
                      throw new ClosedChannelException();
                  
          if ((ops & ~validOps()) != 0)//不合理的注冊值,拋異常
                      throw new IllegalArgumentException();
                  
          synchronized (regLock) {//有鎖就有可能有線程的阻塞和切換  關鍵點1
                      if (blocking)
                          
          throw new IllegalBlockingModeException();
                      SelectionKey k 
          = findKey(sel); //查看原來有沒有注冊過
                      if (k != null) {//注冊過直接設置后返回
                          k.interestOps(ops);
                          k.attach(att);
                      }
                      
          if (k == null) {
                          
          // New registration
                          k = ((AbstractSelector)sel).register(this, ops, att);//沒有注冊的話執行selector的register
                          addKey(k);
                      }
                      
          return k;
                  }
              }


          SelectionImpl.register():
              
          protected final SelectionKey register(AbstractSelectableChannel ch,
                                                    
          int ops,
                                                    Object attachment)
              {
                  
          if (!(ch instanceof SelChImpl))
                      
          throw new IllegalSelectorException();
                  SelectionKeyImpl k 
          = new SelectionKeyImpl((SelChImpl)ch, this);
                  k.attach(attachment);
                  
          synchronized (publicKeys) {//又是鎖,可能阻塞或者線程切換
                      implRegister(k);
                  }
                  k.interestOps(ops);
                  
          return k;
              }


          EPollSelectorImpl.implRegister():
          protected void implRegister(SelectionKeyImpl ski) {
                  SelChImpl ch 
          = ski.channel;
                  fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);
          //關鍵點2 fd和Selectionkey對應的map
                  pollWrapper.add(ch);
                  keys.add(ski);
          }

          EPollArrayWrapper.add():
          void add(SelChImpl channel) {
                  
          synchronized (updateList) {
                      updateList.add(
          new Updator(channel, EPOLL_CTL_ADD));//關鍵點3
                  }
          }

          關鍵點1.register()方法中有同步語句,可能當前線程就阻塞了,線程切換會有性能的損耗。
          關鍵點2.fdToKey是個key為fd,value為selectionKey的map
          關鍵點3.Updator是個內部類:
              private static class Updator {
                  SelChImpl channel;
                  
          int opcode;
                  
          int events;
                  Updator(SelChImpl channel, 
          int opcode, int events) {
                      
          this.channel = channel;
                      
          this.opcode = opcode;
                      
          this.events = events;
                  }
                  Updator(SelChImpl channel, 
          int opcode) {
                      
          this(channel, opcode, 0);
                  }
              }
          代表channel以及對channel的操作類型以及操作的事件,新注冊的操作類型為add,操作事件為0,代表沒有事件。
          在SelectionImpl.register()中最后還要執行SelectionKeyImpl.interestOps()方法注冊操作事件(前面添加的時候操作事件是為0的)
          SelectionKeyImpl:
            
          public SelectionKey interestOps(int ops) {
                  ensureValid();
          //檢測是否可用
                  return nioInterestOps(ops);
              }


              SelectionKey nioInterestOps(
          int ops) {      // package-private
                  if ((ops & ~channel().validOps()) != 0)
                      
          throw new IllegalArgumentException();
                  
          //真正的進行epoll感興趣事件的注冊
                  channel.translateAndSetInterestOps(ops, this);
                  interestOps 
          = ops;
                  
          return this;
              }


              
          public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
                  
          int newOps = 0;
                  
          //進行java的ops和epoll的ops轉換,每個具體的channel是有區別的
                  if ((ops & SelectionKey.OP_READ) != 0)
                      newOps 
          |= PollArrayWrapper.POLLIN;
                  
          if ((ops & SelectionKey.OP_WRITE) != 0)
                      newOps 
          |= PollArrayWrapper.POLLOUT;
                  
          if ((ops & SelectionKey.OP_CONNECT) != 0)
                      newOps 
          |= PollArrayWrapper.POLLCONN;
                  sk.selector.putEventOps(sk, newOps);
              }

             
          void setInterest(SelChImpl channel, int mask) {

                  
          synchronized (updateList) {
                      
          // if the previous pending operation is to add this file descriptor
                      
          // to epoll then update its event set
                      if (updateList.size() > 0) {//關鍵點1
                          Updator last = updateList.getLast();
                          
          //這個肯定是剛才那個注冊的channel,直接進行事件的更新
                          if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
                              last.events 
          = mask;
                              
          return;
                          }
                      }

                      
          // update existing registration 程序運行到這里的話,說明前面已經有更新的updator加入了,這里只好新加入一個
                      updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
                  }
              }
          關鍵點1.這里是考慮到并發的問題了,但是我有個疑問為什么注冊要分兩個步驟執行,為什么不直接在EPollSelectorImpl.implRegister()加入 updateList.add(new Updator(channel, EPOLL_CTL_ADD, mask))呢?
          至此register()動作已經完成。


          select()



          代碼如下:
              public int select(long timeout)
                  
          throws IOException
              {
                  
          if (timeout < 0)
                      
          throw new IllegalArgumentException("Negative timeout");
                  
          return lockAndDoSelect((timeout == 0? -1 : timeout);
              }

              
          private int lockAndDoSelect(long timeout) throws IOException {
                  
          synchronized (this) {
                      
          if (!isOpen())
                          
          throw new ClosedSelectorException();
                      
          synchronized (publicKeys) {
                          
          synchronized (publicSelectedKeys) {
                              
          return doSelect(timeout);
                          }
                      }
                  }
              }
             
          protected int doSelect(long timeout)
                  
          throws IOException
              {
                  
          if (closed)
                      
          throw new ClosedSelectorException();

                  
          //反注冊過程 刪除取消的通道的key(interest keys,ready keys(也就是selected keys),channel的key set)
                  processDeregisterQueue();
                  
          try {
                      begin();
                      pollWrapper.poll(timeout);
          //關鍵點1
                  } finally {
                      end();
                  }
                  processDeregisterQueue();
                  
          int numKeysUpdated = updateSelectedKeys();
                  
          if (pollWrapper.interrupted()) {
                      
          // Clear the wakeup pipe
                      pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
                      
          synchronized (interruptLock) {
                          pollWrapper.clearInterrupted();
                          IOUtil.drain(fd0);
                          interruptTriggered 
          = false;
                      }
                  }
                  
          return numKeysUpdated;
              }


                
          void processDeregisterQueue() throws IOException {
                  
          // Precondition: Synchronized on this, keys, and selectedKeys
                  
          //遍歷取消的key
                  Set cks = cancelledKeys();
                  
          synchronized (cks) {
                      Iterator i 
          = cks.iterator();
                      
          while (i.hasNext()) {
                          SelectionKeyImpl ski 
          = (SelectionKeyImpl)i.next();
                          
          try {
                               //底層的取消實現
                              implDereg(ski);
                          } 
          catch (SocketException se) {
                              IOException ioe 
          = new IOException(
                                  
          "Error deregistering key");
                              ioe.initCause(se);
                              
          throw ioe;
                          } 
          finally {
                              
          //刪除取消的key
                              i.remove();
                          }
                      }
                  }
              }

                 
          protected void implDereg(SelectionKeyImpl ski) throws IOException {
                  
          assert (ski.getIndex() >= 0);
                  SelChImpl ch 
          = ski.channel;
                  
          int fd = ch.getFDVal();
                  fdToKey.remove(
          new Integer(fd));//hashmap中去除
                  pollWrapper.release(ch);//這步很關鍵
                  ski.setIndex(-1);
                  keys.remove(ski);
          //總的key set去除
                  selectedKeys.remove(ski);//已經準備好的set去除
                  deregister((AbstractSelectionKey)ski);//移除channel的集合
                  SelectableChannel selch = ski.channel();
                  
          if (!selch.isOpen() && !selch.isRegistered())
                      ((SelChImpl)selch).kill();
              }



              
          void release(SelChImpl channel) {

                  
          //空閑隊列刪除了
                  synchronized (updateList) {
                      
          // flush any pending updates
                      for (Iterator<Updator> it = updateList.iterator(); it.hasNext();) {
                          
          if (it.next().channel == channel) {
                              it.remove();
                          }
                      }

                      
          // remove from the idle set (if present)
                      idleSet.remove(channel);

                       
          //調用native 通知本channel對應的fd被被刪除
                      
          // remove from epoll (if registered)
                      epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
                  }
              }
          看關鍵點1的代碼:
              int poll(long timeout) throws IOException {
                  updateRegistrations();
          //在poll前 先把update里面不需要的條目處理掉
                  updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
                  
          for (int i=0; i<updated; i++) {
                      
          if (getDescriptor(i) == incomingInterruptFD) {
                          interruptedIndex 
          = i;
                          interrupted 
          = true;
                          
          break;
                      }
                  }
                  
          return updated;
              }


              
          void updateRegistrations() {
                  
          synchronized (updateList) {
                      Updator u 
          = null;
                      
          while ((u = updateList.poll()) != null) {
                          SelChImpl ch 
          = u.channel;
                          
          if (!ch.isOpen())
                              
          continue;

                          
          // if the events are 0 then file descriptor is put into "idle
                          
          // set" to prevent it being polled
                          if (u.events == 0) {//這個表示interest事件為0
                              boolean added = idleSet.add(u.channel); //關鍵點1
                              
          //先加入到idleSet里面 如果是這次加入的 而且操作行為是mod,那么就是個刪除這個channel對應的fd
                              
          // if added to idle set then remove from epoll if registered
                              if (added && (u.opcode == EPOLL_CTL_MOD))
                                  epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 
          0);
                          } 
          else {
                              
          //關鍵點 2
                              
          // events are specified. If file descriptor was in idle set
                              
          // it must be re-registered (by converting opcode to ADD)
                              boolean idle = false;
                              
          //如果idleSet不為空而且有這個Updator  說明關鍵點1處代碼返回true,操作行為為add,mod的話在epollCtl會被刪除掉
                              if (!idleSet.isEmpty())
                                  idle 
          = idleSet.remove(u.channel);
                              
          int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
                              epollCtl(epfd, opcode, ch.getFDVal(), u.events);
                          }
                      }
                  }
              }

          updateList是在EPollArrayWrapper.setInterest()和add()方法中添加的,當有多個線程的時候,通過channel register()有可能剛加入的Updator會被updateRegistrations()得到,得到的就是Channel第一次register的updatot,這個時候events為0,被加入到idleSet接著setInterest()被調用(channel register()最后一步),多了一個updator,這個時候再執行Selector.select(),顯然會到關鍵點2操作行為add,是沒有問題的。執行完updateRegistrations()方法,然后就epollWait()方法的調用,這個就是epoll的native方法了


          總結:
          1.Channel的register方法最后加入channel的感興趣的事件到updatorList中
          2.Selector的select的方法主要是對updatorList進行運作,首先去除所有cancelkey(),也就刪除了對應的底層的updatorList的條目,然后迭代updatorList根據updator的event事件進行處理,也就是執行epoll的epollCtl方法,之后就是執行epollWait等待epollCtl的channel對應的callback函數的執行了。

          posted on 2011-09-29 13:15 nod0620 閱讀(1637) 評論(2)  編輯  收藏

          評論

          # re: java io以及unix io模型 2016-02-27 20:09 tianapple

          這么好的文章,沒有頂  回復  更多評論   

          # re: java io以及unix io模型 2016-02-27 20:09 tianapple

          //反注冊過程 刪除取消的通道的key(interest keys,ready keys(也就是selected keys),channel的key set)
          processDeregisterQueue();
          這段調用是合意,實在沒看明白,能否說名一下
            回復  更多評論   


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          <2025年6月>
          25262728293031
          1234567
          891011121314
          15161718192021
          22232425262728
          293012345

          導航

          統計

          常用鏈接

          留言簿

          隨筆分類

          隨筆檔案

          文章分類

          文章檔案

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 台湾省| 邯郸市| 孝昌县| 尚志市| 桦川县| 惠东县| 佛山市| 华池县| 乾安县| 万州区| 子长县| 太谷县| 彭泽县| 来凤县| 乐业县| 佛冈县| 银川市| 神木县| 靖远县| 苗栗市| 马尔康县| 南召县| 大洼县| 安吉县| 阿尔山市| 新巴尔虎左旗| 乐都县| 江源县| 临邑县| 镇赉县| 兴山县| 长武县| 龙泉市| 云浮市| 花莲县| 台湾省| 兴隆县| 汪清县| 兴化市| 瓦房店市| 杭州市|