新的起點(diǎn) 新的開始

          快樂生活 !

          (轉(zhuǎn)好文章)Reactor模式和NIO

          本文來自 ZhangLiHai.Com Blog

          當(dāng)前分布式計算 Web Services盛行天下,這些網(wǎng)絡(luò)服務(wù)的底層都離不開對socket的操作。他們都有一個共同的結(jié)構(gòu):
          1. Read request
          2. Decode request
          3. Process service
          4. Encode reply
          5. Send reply

          經(jīng)典的網(wǎng)絡(luò)服務(wù)的設(shè)計如下圖,在每個線程中完成對數(shù)據(jù)的處理:
          ?

          但這種模式在用戶負(fù)載增加時,性能將下降非常的快。我們需要重新尋找一個新的方案,保持?jǐn)?shù)據(jù)處理的流暢,很顯然,事件觸發(fā)機(jī)制是最好的解決辦法,當(dāng)有事件發(fā)生時,會觸動handler,然后開始數(shù)據(jù)的處理。

          Reactor模式類似于AWT中的Event處理:
          ?

          Reactor模式參與者

          1.Reactor 負(fù)責(zé)響應(yīng)IO事件,一旦發(fā)生,廣播發(fā)送給相應(yīng)的Handler去處理,這類似于AWT的thread
          2.Handler 是負(fù)責(zé)非堵塞行為,類似于AWT ActionListeners;同時負(fù)責(zé)將handlers與event事件綁定,類似于AWT addActionListener

          如圖:
          ?

          Java的NIO為reactor模式提供了實(shí)現(xiàn)的基礎(chǔ)機(jī)制,它的Selector當(dāng)發(fā)現(xiàn)某個channel有數(shù)據(jù)時,會通過SlectorKey來告知我們,在此我們實(shí)現(xiàn)事件和handler的綁定。

          我們來看看Reactor模式代碼:


          public class Reactor implements Runnable{

            final Selector selector;
            final ServerSocketChannel serverSocket;

            Reactor(int port) throws IOException {
              selector = Selector.open();
              serverSocket = ServerSocketChannel.open();
              InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
              serverSocket.socket().bind(address);

              serverSocket.configureBlocking(false);
              //向selector注冊該channel
               SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

              logger.debug("-->Start serverSocket.register!");

              //利用sk的attache功能綁定Acceptor 如果有事情,觸發(fā)Acceptor
              sk.attach(new Acceptor());
              logger.debug("-->attach(new Acceptor()!");
            }


            public void run() { // normally in a new Thread
              try {
              while (!Thread.interrupted())
              {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //Selector如果發(fā)現(xiàn)channel有OP_ACCEPT或READ事件發(fā)生,下列遍歷就會進(jìn)行。
                while (it.hasNext())
                  //來一個事件 第一次觸發(fā)一個accepter線程
                  //以后觸發(fā)SocketReadHandler
                  dispatch((SelectionKey)(it.next()));
                  selected.clear();
                }
              }catch (IOException ex) {
                  logger.debug("reactor stop!"+ex);
              }
            }

            //運(yùn)行Acceptor或SocketReadHandler
            void dispatch(SelectionKey k) {
              Runnable r = (Runnable)(k.attachment());
              if (r != null){
                // r.run();

              }
            }

            class Acceptor implements Runnable { // inner
              public void run() {
              try {
                logger.debug("-->ready for accept!");
                SocketChannel c = serverSocket.accept();
                if (c != null)
                  //調(diào)用Handler來處理channel
                  new SocketReadHandler(selector, c);
                }
              catch(IOException ex) {
                logger.debug("accept stop!"+ex);
              }
              }
            }
          }

          以上代碼中巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發(fā)生事件的channel鏈接在一起,當(dāng)發(fā)生事件時,可以立即觸發(fā)相應(yīng)鏈接的Handler。

          再看看Handler代碼:

          public class SocketReadHandler implements Runnable {

            public static Logger logger = Logger.getLogger(SocketReadHandler.class);

            private Test test=new Test();

            final SocketChannel socket;
            final SelectionKey sk;

             static final int READING = 0, SENDING = 1;
            int state = READING;

            public SocketReadHandler(Selector sel, SocketChannel c)
              throws IOException {

              socket = c;

              socket.configureBlocking(false);
               sk = socket.register(sel, 0);

              //將SelectionKey綁定為本Handler 下一步有事件觸發(fā)時,將調(diào)用本類的run方法。
              //參看dispatch(SelectionKey k)
              sk.attach(this);

              //同時將SelectionKey標(biāo)記為可讀,以便讀取。
              sk.interestOps(SelectionKey.OP_READ);
              sel.wakeup();
            }

            public void run() {
              try{
              // test.read(socket,input);
                readRequest() ;
              }catch(Exception ex){
              logger.debug("readRequest error"+ex);
              }
            }


          /**
          * 處理讀取data
          * @param key
          * @throws Exception
          */
          private void readRequest() throws Exception {

            ByteBuffer input = ByteBuffer.allocate(1024);
            input.clear();
            try{

              int bytesRead = socket.read(input);

              ......

              //激活線程池 處理這些request
              requestHandle(new Request(socket,btt));

              .....


            }catch(Exception e) {
            }

          }

          注意在Handler里面又執(zhí)行了一次attach,這樣,覆蓋前面的Acceptor,下次該Handler又有READ事件發(fā)生時,將直接觸發(fā)Handler.從而開始了數(shù)據(jù)的讀 處理 寫 發(fā)出等流程處理。

          將數(shù)據(jù)讀出后,可以將這些數(shù)據(jù)處理線程做成一個線程池,這樣,數(shù)據(jù)讀出后,立即扔到線程池中,這樣加速處理速度:

          更進(jìn)一步,我們可以使用多個Selector分別處理連接和讀事件。

          一個高性能的Java網(wǎng)絡(luò)服務(wù)機(jī)制就要形成,激動人心的集群并行計算即將實(shí)現(xiàn)。

          posted on 2007-03-27 11:04 advincenting 閱讀(725) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           

          公告

          Locations of visitors to this pageBlogJava
        1. 首頁
        2. 新隨筆
        3. 聯(lián)系
        4. 聚合
        5. 管理
        6. <2007年3月>
          25262728123
          45678910
          11121314151617
          18192021222324
          25262728293031
          1234567

          統(tǒng)計

          常用鏈接

          留言簿(13)

          隨筆分類(71)

          隨筆檔案(179)

          文章檔案(13)

          新聞分類

          IT人的英語學(xué)習(xí)網(wǎng)站

          JAVA站點(diǎn)

          優(yōu)秀個人博客鏈接

          官網(wǎng)學(xué)習(xí)站點(diǎn)

          生活工作站點(diǎn)

          最新隨筆

          搜索

          積分與排名

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 仁寿县| 那曲县| 饶平县| 增城市| 贵溪市| 东安县| 石泉县| 黄大仙区| 鹤岗市| 湄潭县| 德格县| 象山县| 保德县| 渭南市| 罗山县| 阿鲁科尔沁旗| 五常市| 永登县| 万安县| 梅河口市| 仪征市| 怀化市| 云林县| 库伦旗| 利辛县| 客服| 安泽县| 禹城市| 封丘县| 南江县| 资中县| 田东县| 孟州市| 泰安市| 紫金县| 丰城市| 西乌珠穆沁旗| 浮山县| 莎车县| 大足县| 靖江市|