weidagang2046的專欄

          物格而后知致
          隨筆 - 8, 文章 - 409, 評(píng)論 - 101, 引用 - 0
          數(shù)據(jù)加載中……

          Reactor模式和NIO

          Reactor模式和NIO

          板橋里人 jdon.com 2002/11/08

          本文可看成是對(duì)Doug Lea Scalable IO in Java一文的翻譯。

          當(dāng)前分布式計(jì)算 Web Services盛行天下,這些網(wǎng)絡(luò)服務(wù)的底層都離不開對(duì)socket的操作。他們都有一個(gè)共同的結(jié)構(gòu):
          1. Read request
          2. Decode request
          3. Process service
          4. Encode reply
          5. Send reply

          經(jīng)典的網(wǎng)絡(luò)服務(wù)的設(shè)計(jì)如下圖,在每個(gè)線程中完成對(duì)數(shù)據(jù)的處理:

          但這種模式在用戶負(fù)載增加時(shí),性能將下降非常的快。我們需要重新尋找一個(gè)新的方案,保持?jǐn)?shù)據(jù)處理的流暢,很顯然,事件觸發(fā)機(jī)制是最好的解決辦法,當(dāng)有事件發(fā)生時(shí),會(huì)觸動(dòng)handler,然后開始數(shù)據(jù)的處理。

          Reactor模式類似于AWT中的Event處理:

          Reactor模式參與者

          1.Reactor 負(fù)責(zé)響應(yīng)IO事件,一旦發(fā)生,廣播發(fā)送給相應(yīng)的Handler去處理,這類似于AWT的thread
          2.Handler 是負(fù)責(zé)非堵塞行為,類似于AWT ActionListeners;同時(shí)負(fù)責(zé)將handlers與event事件綁定,類似于AWT addActionListener

          如圖:

          Java的NIO為reactor模式提供了實(shí)現(xiàn)的基礎(chǔ)機(jī)制,它的Selector當(dāng)發(fā)現(xiàn)某個(gè)channel有數(shù)據(jù)時(shí),會(huì)通過SlectorKey來告知我們,在此我們實(shí)現(xiàn)事件和handler的綁定。

          我們來看看Reactor模式代碼:


          public class Reactor implements Runnable{

            final Selector selector;
            final ServerSocketChannel serverSocket;

            Reactor(int port) throws IOException {
              selector = Selector.open();
              serverSocket = ServerSocketChannel.open();
              InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
              serverSocket.socket().bind(address);

              serverSocket.configureBlocking(false);
              //向selector注冊(cè)該channel
               SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

              logger.debug("-->Start serverSocket.register!");

              //利用sk的attache功能綁定Acceptor 如果有事情,觸發(fā)Acceptor
              sk.attach(new Acceptor());
              logger.debug("-->attach(new Acceptor()!");
            }


            public void run() { // normally in a new Thread
              try {
              while (!Thread.interrupted())
              {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //Selector如果發(fā)現(xiàn)channel有OP_ACCEPT或READ事件發(fā)生,下列遍歷就會(huì)進(jìn)行。
                while (it.hasNext())
                  //來一個(gè)事件 第一次觸發(fā)一個(gè)accepter線程
                  //以后觸發(fā)SocketReadHandler
                  dispatch((SelectionKey)(it.next()));
                  selected.clear();
                }
              }catch (IOException ex) {
                  logger.debug("reactor stop!"+ex);
              }
            }

            //運(yùn)行Acceptor或SocketReadHandler
            void dispatch(SelectionKey k) {
              Runnable r = (Runnable)(k.attachment());
              if (r != null){
                // r.run();

              }
            }

            class Acceptor implements Runnable { // inner
              public void run() {
              try {
                logger.debug("-->ready for accept!");
                SocketChannel c = serverSocket.accept();
                if (c != null)
                  //調(diào)用Handler來處理channel
                  new SocketReadHandler(selector, c);
                }
              catch(IOException ex) {
                logger.debug("accept stop!"+ex);
              }
              }
            }
          }

          以上代碼中巧妙使用了SocketChannel的attach功能,將Hanlder和可能會(huì)發(fā)生事件的channel鏈接在一起,當(dāng)發(fā)生事件時(shí),可以立即觸發(fā)相應(yīng)鏈接的Handler。

          再看看Handler代碼:

          public class SocketReadHandler implements Runnable {

            public static Logger logger = Logger.getLogger(SocketReadHandler.class);

            private Test test=new Test();

            final SocketChannel socket;
            final SelectionKey sk;

             static final int READING = 0, SENDING = 1;
            int state = READING;

            public SocketReadHandler(Selector sel, SocketChannel c)
              throws IOException {

              socket = c;

              socket.configureBlocking(false);
               sk = socket.register(sel, 0);

              //將SelectionKey綁定為本Handler 下一步有事件觸發(fā)時(shí),將調(diào)用本類的run方法。
              //參看dispatch(SelectionKey k)
              sk.attach(this);

              //同時(shí)將SelectionKey標(biāo)記為可讀,以便讀取。
              sk.interestOps(SelectionKey.OP_READ);
              sel.wakeup();
            }

            public void run() {
              try{
              // test.read(socket,input);
                readRequest() ;
              }catch(Exception ex){
              logger.debug("readRequest error"+ex);
              }
            }


          /**
          * 處理讀取data
          * @param key
          * @throws Exception
          */
          private void readRequest() throws Exception {

            ByteBuffer input = ByteBuffer.allocate(1024);
            input.clear();
            try{

              int bytesRead = socket.read(input);

              ......

              //激活線程池 處理這些request
              requestHandle(new Request(socket,btt));

              .....


            }catch(Exception e) {
            }

          }

          注意在Handler里面又執(zhí)行了一次attach,這樣,覆蓋前面的Acceptor,下次該Handler又有READ事件發(fā)生時(shí),將直接觸發(fā)Handler.從而開始了數(shù)據(jù)的讀 處理 寫 發(fā)出等流程處理。

          將數(shù)據(jù)讀出后,可以將這些數(shù)據(jù)處理線程做成一個(gè)線程池,這樣,數(shù)據(jù)讀出后,立即扔到線程池中,這樣加速處理速度:

          更進(jìn)一步,我們可以使用多個(gè)Selector分別處理連接和讀事件。

          一個(gè)高性能的Java網(wǎng)絡(luò)服務(wù)機(jī)制就要形成,激動(dòng)人心的集群并行計(jì)算即將實(shí)現(xiàn)。

          轉(zhuǎn)自:http://www.jdon.com/concurrent/reactor.htm

          posted on 2005-06-11 21:09 weidagang2046 閱讀(195) 評(píng)論(0)  編輯  收藏 所屬分類: Java

          主站蜘蛛池模板: 驻马店市| 博客| 兴业县| 宜良县| 通化县| 浦东新区| 榕江县| 黄梅县| 鄂伦春自治旗| 汪清县| 凤山县| 宜州市| 望都县| 棋牌| 明星| 微博| 合阳县| 龙陵县| 佛冈县| 黄石市| 宿松县| 新巴尔虎右旗| 盱眙县| 林芝县| 孟津县| 河北区| 襄汾县| 垣曲县| 乃东县| 肥西县| 囊谦县| 北宁市| 赤城县| 南宁市| 盐池县| 海淀区| 兰西县| 大冶市| 会昌县| 石城县| 东阳市|