新的起點 新的開始

          快樂生活 !

          (轉好文章)Reactor模式和NIO

          本文來自 ZhangLiHai.Com Blog

          當前分布式計算 Web Services盛行天下,這些網絡服務的底層都離不開對socket的操作。他們都有一個共同的結構:
          1. Read request
          2. Decode request
          3. Process service
          4. Encode reply
          5. Send reply

          經典的網絡服務的設計如下圖,在每個線程中完成對數據的處理:
          ?

          但這種模式在用戶負載增加時,性能將下降非常的快。我們需要重新尋找一個新的方案,保持數據處理的流暢,很顯然,事件觸發機制是最好的解決辦法,當有事件發生時,會觸動handler,然后開始數據的處理。

          Reactor模式類似于AWT中的Event處理:
          ?

          Reactor模式參與者

          1.Reactor 負責響應IO事件,一旦發生,廣播發送給相應的Handler去處理,這類似于AWT的thread
          2.Handler 是負責非堵塞行為,類似于AWT ActionListeners;同時負責將handlers與event事件綁定,類似于AWT addActionListener

          如圖:
          ?

          Java的NIO為reactor模式提供了實現的基礎機制,它的Selector當發現某個channel有數據時,會通過SlectorKey來告知我們,在此我們實現事件和handler的綁定。

          我們來看看Reactor模式代碼:


          public class Reactor implements Runnable{

            final Selector selector;
            final ServerSocketChannel serverSocket;

            Reactor(int port) throws IOException {
              selector = Selector.open();
              serverSocket = ServerSocketChannel.open();
              InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
              serverSocket.socket().bind(address);

              serverSocket.configureBlocking(false);
              //向selector注冊該channel
               SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

              logger.debug("-->Start serverSocket.register!");

              //利用sk的attache功能綁定Acceptor 如果有事情,觸發Acceptor
              sk.attach(new Acceptor());
              logger.debug("-->attach(new Acceptor()!");
            }


            public void run() { // normally in a new Thread
              try {
              while (!Thread.interrupted())
              {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //Selector如果發現channel有OP_ACCEPT或READ事件發生,下列遍歷就會進行。
                while (it.hasNext())
                  //來一個事件 第一次觸發一個accepter線程
                  //以后觸發SocketReadHandler
                  dispatch((SelectionKey)(it.next()));
                  selected.clear();
                }
              }catch (IOException ex) {
                  logger.debug("reactor stop!"+ex);
              }
            }

            //運行Acceptor或SocketReadHandler
            void dispatch(SelectionKey k) {
              Runnable r = (Runnable)(k.attachment());
              if (r != null){
                // r.run();

              }
            }

            class Acceptor implements Runnable { // inner
              public void run() {
              try {
                logger.debug("-->ready for accept!");
                SocketChannel c = serverSocket.accept();
                if (c != null)
                  //調用Handler來處理channel
                  new SocketReadHandler(selector, c);
                }
              catch(IOException ex) {
                logger.debug("accept stop!"+ex);
              }
              }
            }
          }

          以上代碼中巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發生事件的channel鏈接在一起,當發生事件時,可以立即觸發相應鏈接的Handler。

          再看看Handler代碼:

          public class SocketReadHandler implements Runnable {

            public static Logger logger = Logger.getLogger(SocketReadHandler.class);

            private Test test=new Test();

            final SocketChannel socket;
            final SelectionKey sk;

             static final int READING = 0, SENDING = 1;
            int state = READING;

            public SocketReadHandler(Selector sel, SocketChannel c)
              throws IOException {

              socket = c;

              socket.configureBlocking(false);
               sk = socket.register(sel, 0);

              //將SelectionKey綁定為本Handler 下一步有事件觸發時,將調用本類的run方法。
              //參看dispatch(SelectionKey k)
              sk.attach(this);

              //同時將SelectionKey標記為可讀,以便讀取。
              sk.interestOps(SelectionKey.OP_READ);
              sel.wakeup();
            }

            public void run() {
              try{
              // test.read(socket,input);
                readRequest() ;
              }catch(Exception ex){
              logger.debug("readRequest error"+ex);
              }
            }


          /**
          * 處理讀取data
          * @param key
          * @throws Exception
          */
          private void readRequest() throws Exception {

            ByteBuffer input = ByteBuffer.allocate(1024);
            input.clear();
            try{

              int bytesRead = socket.read(input);

              ......

              //激活線程池 處理這些request
              requestHandle(new Request(socket,btt));

              .....


            }catch(Exception e) {
            }

          }

          注意在Handler里面又執行了一次attach,這樣,覆蓋前面的Acceptor,下次該Handler又有READ事件發生時,將直接觸發Handler.從而開始了數據的讀 處理 寫 發出等流程處理。

          將數據讀出后,可以將這些數據處理線程做成一個線程池,這樣,數據讀出后,立即扔到線程池中,這樣加速處理速度:

          更進一步,我們可以使用多個Selector分別處理連接和讀事件。

          一個高性能的Java網絡服務機制就要形成,激動人心的集群并行計算即將實現。

          posted on 2007-03-27 11:04 advincenting 閱讀(726) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           

          公告

          Locations of visitors to this pageBlogJava
        1. 首頁
        2. 新隨筆
        3. 聯系
        4. 聚合
        5. 管理
        6. <2007年3月>
          25262728123
          45678910
          11121314151617
          18192021222324
          25262728293031
          1234567

          統計

          常用鏈接

          留言簿(13)

          隨筆分類(71)

          隨筆檔案(179)

          文章檔案(13)

          新聞分類

          IT人的英語學習網站

          JAVA站點

          優秀個人博客鏈接

          官網學習站點

          生活工作站點

          最新隨筆

          搜索

          積分與排名

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 黔江区| 宁化县| 克东县| 马尔康县| 施甸县| 泌阳县| 城口县| 南陵县| 关岭| 化隆| 永定县| 南澳县| 来宾市| 平江县| 岳阳县| 凤阳县| 上虞市| 昌江| 澄江县| 青田县| 辽宁省| 岳西县| 南华县| 浠水县| 东乡| 桐庐县| 乌兰县| 太原市| 牡丹江市| 辉县市| 景洪市| 木里| 通化市| 江源县| 德阳市| 博罗县| 汕尾市| 绵阳市| 伊川县| 双城市| 巴林左旗|