Dict.CN 在線詞典, 英語學習, 在線翻譯

          都市淘沙者

          荔枝FM Everyone can be host

          統計

          留言簿(23)

          積分與排名

          優秀學習網站

          友情連接

          閱讀排行榜

          評論排行榜

          Scalable io in java 轉

          原文http://blog.csdn.net/liu251/archive/2008/07/06/2618752.aspx

          本文可看成是對Doug Lea Scalable IO in Java一文的翻譯。

          當前分布式計算 Web Services盛行天下,這些網絡服務的底層都離不開對socket的操作。他們都有一個共同的結構: 1. Read request 2. Decode request 3. Process service 4. Encode reply 5. Send reply

          經典的網絡服務的設計如下圖,在每個線程中完成對數據的處理:  

          但這種模式在用戶負載增加時,性能將下降非常的快。我們需要重新尋找一個新的方案,保持數據處理的流暢,很顯然,事件觸發機制是最好的解決辦法,當有事件發生時,會觸動handler,然后開始數據的處理。

          Reactor模式類似于AWT中的Event處理:  

          Reactor模式參與者

          1.Reactor 負責響應IO事件,一旦發生,廣播發送給相應的Handler去處理,這類似于AWT的thread 2.Handler 是負責非堵塞行為,類似于AWT ActionListeners;同時負責將handlers與event事件綁定,類似于AWT addActionListener

          如圖:  

          Java的NIO為reactor模式提供了實現的基礎機制,它的Selector當發現某個channel有數據時,會通過SlectorKey來告知我們,在此我們實現事件和handler的綁定。

          我們來看看Reactor模式代碼:


          import java.io.IOException;
          import java.net.InetAddress;
          import java.net.InetSocketAddress;
          import java.nio.channels.SelectionKey;
          import java.nio.channels.Selector;
          import java.nio.channels.ServerSocketChannel;
          import java.nio.channels.SocketChannel;
          import java.util.Iterator;
          import java.util.Set;

          import sun.net.www.protocol.http.Handler;

          public class  Reactor implements Runnable
              
              
          final Selector selector; 
              
          final ServerSocketChannel serverSocket; 
              
              Reactor(
          int port) throws IOException {
                  selector 
          = Selector.open(); 
                  serverSocket 
          = ServerSocketChannel.open(); 
                  InetSocketAddress address 
          = new InetSocketAddress(InetAddress.getLocalHost(),port); 
                  serverSocket.socket().bind(address); serverSocket.configureBlocking(
          false); 
                  
          // 向selector注冊該channel
                  SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT); 
                  
          // logger.debug("-->Start serverSocket.register!");
                  
          // 利用sk的attache功能綁定Acceptor 如果有事情,觸發Acceptor
                  sk.attach(new Acceptor()); 
                  
          // logger.debug("-->attach(new Acceptor()!");
              }



              
          public void run() // normally in a new Thread
                  try {
                      
          while (!Thread.interrupted()){
                          selector.select();
                          Set selected 
          = selector.selectedKeys();
                          Iterator it 
          = selected.iterator();// Selector如果發現channel有OP_ACCEPT或READ事件發生,下列遍歷就會進行。
                          while (it.hasNext())// 來一個事件 第一次觸發一個accepter線程// 以后觸發SocketReadHandler
                          dispatch((SelectionKey)(it.next()));
                          selected.clear();
                      }

                  }

                  
          catch (IOException ex) {
                      
          // logger.debug("reactor stop!"+ex);}
                  }

              }


              
          // 運行Acceptor或SocketReadHandler 
              void dispatch(SelectionKey k) {
                  Runnable r 
          =(Runnable)(k.attachment());
                  
          if (r != null){
                      r.run();
                  }
           
              }
           

              
          class Acceptor implements Runnable {
                
          public void run(){
                  
          try {
                      
          //logger.debug("-->ready for accept!");
                      SocketChannel c = serverSocket.accept();
                      
          if (c != null)//調用Handler來處理channel        
                          new SocketReadHandler(selector, c);
                      }
          catch(IOException ex){
                           
          //logger.debug("accept stop!"+ex);    
                      }
             
                }

              }

          }

           

          以上代碼中巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發生事件的channel鏈接在一起,當發生事件時,可以立即觸發相應鏈接的Handler。

          再看看Handler代碼:

           


          import java.io.IOException;
          import java.nio.ByteBuffer;
          import java.nio.channels.SelectionKey;
          import java.nio.channels.Selector;
          import java.nio.channels.SocketChannel;

          import org.apache.log4j.Logger;

          public class SocketReadHandler implements Runnable {
              
              
          public static Logger logger = Logger.getLogger(SocketReadHandler.class);
              
          private Test test=new Test();
              
          final SocketChannel socket;
              
          final SelectionKey sk;
              
          static final int READING = 0, SENDING = 1;
              
          int state = READING;

              
          public SocketReadHandler(Selector sel, SocketChannel c)throws IOException {
                  socket 
          = c;
                  socket.configureBlocking(
          false);  
                  sk 
          = socket.register(sel, 0);    
                  
          //將SelectionKey綁定為本Handler 下一步有事件觸發時,將調用本類的run方法。//參看dispatch(SelectionKey k) 
                  sk.attach(this); 
                  
          //同時將SelectionKey標記為可讀,以便讀取。
                  sk.interestOps(SelectionKey.OP_READ);
                  sel.wakeup();
              }

              
              
          public void run() {
                 
          try{
                     readRequest() ;
                 }
          catch(Exception ex){
                     logger.debug(
          "readRequest error"+ex);
                 }
                 
              }


              
          /** * 處理讀取data * @param key * @throws Exception */ 
              
          private void readRequest() throws Exception {
                  ByteBuffer input 
          = ByteBuffer.allocate(1024);
                  input.clear();
                  
          try{
                      
          int bytesRead = socket.read(input);
                      
          // 
                  
                      
          //激活線程池 處理這些requestrequestHandle(new Request(socket,btt));
                      
          //..
                  }
          catch(Exception e){
                  }

              }

          }


           

          注意在Handler里面又執行了一次attach,這樣,覆蓋前面的Acceptor,下次該Handler又有READ事件發生時,將直接觸發Handler.從而開始了數據的讀 處理 寫 發出等流程處理。

          將數據讀出后,可以將這些數據處理線程做成一個線程池,這樣,數據讀出后,立即扔到線程池中,這樣加速處理速度:

          更進一步,我們可以使用多個Selector分別處理連接和讀事件。

          一個高性能的Java網絡服務機制就要形成,激動人心的集群并行計算即將實現。

          Scalable IO in Java原文

           

           

          posted on 2009-02-12 14:35 都市淘沙者 閱讀(1134) 評論(0)  編輯  收藏 所屬分類: 多線程并發編程

          主站蜘蛛池模板: 磐石市| 香格里拉县| 寻乌县| 广水市| 驻马店市| 刚察县| 九江县| 东辽县| 吉安市| 萍乡市| 乐平市| 汾阳市| 多伦县| 依安县| 宜春市| 长春市| 安溪县| 九龙城区| 关岭| 汉阴县| 连州市| 金山区| 浮山县| 从江县| 济阳县| 吴堡县| 江西省| 江川县| 浮山县| 会同县| 永仁县| 科技| 茶陵县| 沽源县| 广水市| 沅陵县| 平远县| 余庆县| 迭部县| 枣强县| 巍山|