莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          yanf4j引入了客戶端非阻塞API

          Posted on 2009-02-19 00:15 dennis 閱讀(1774) 評論(2)  編輯  收藏 所屬分類: javamy open-source
              yanf4j發(fā)布一個0.50-beta2版本,這個版本最重要的改進就是引入了客戶端連接非阻塞API,主要最近的工作要用到,所以添加了。兩個核心類TCPConnectorControllerUDPConnectorController分別用于TCP和UDP的客戶端連接控制。例如,現(xiàn)在的UDP echo client可以寫成:

               //客戶端echo handler
               class EchoClientHandler extends HandlerAdapter {

                  
          public void onReceive(Session udpSession, Object t) {
                      DatagramPacket datagramPacket 
          = (DatagramPacket) t;
                      System.out.println(
          "recv:" + new String(datagramPacket.getData()));
                  }

                  @Override
                  
          public void onMessageSent(Session session, Object t) {
                      System.out.println(
          "send:" + new String((byte[]) t));
                  }

              }

                 //連接代碼,并發(fā)送UDP包

                  UDPConnectorController connector 
          = new UDPConnectorController();
                  connector.setSoTimeout(
          1000);
                  connector.setHandler(
          new EchoClientHandler());
                  connector.connect(
          new InetSocketAddress(InetAddress.getByName(host),
                          port));
                  
          for (int i = 0; i < 10000; i++) {
                      String s 
          = "hello " + i;
                      DatagramPacket packet 
          = new DatagramPacket(s.getBytes(), s.length());
                      connector.send(packet);
                  }

              UDP不是面向連接的,因此connect方法僅僅是調(diào)用了底層DatagramChannel.connect方法,用來限制接收和發(fā)送的packet的遠程端點。

              再來看看TCPConnectorController的使用,同樣看Echo Client的實現(xiàn):
          //客戶端的echo handler
          class EchoHandler extends HandlerAdapter<String> {

                  @Override
                  
          public void onConnected(Session session) {
                      
          try {
                          
          //一連接就發(fā)送NUM個字符串
                          for (int i = 0; i < NUM; i++)
                              session.send(generateString(i));
                       } 
          catch (Exception e) {

                       }
                  }

                  
          public String generateString(int len) {
                      StringBuffer sb 
          = new StringBuffer();
                      
          for (int i = 0; i < MESSAGE_LEN; i++)
                          sb.append(i);
                      
          return sb.toString();
                  }

                  @Override
                  
          public void onReceive(Session session, String t) {
                      //打印接收到字符串
                      if (DEBUG)
                          System.out.println(
          "recv:" + t);
                      
                  }

              }


          //...連接API,TCPConnectorController示例
              Configuration configuration = new Configuration();
                  configuration.setTcpSessionReadBufferSize(
          256 * 1024); // 設(shè)置讀的緩沖區(qū)大小
              TCPConnectorController    connector = new TCPConnectorController(configuration,
                          
          new StringCodecFactory());
              connector.setHandler(
          new EchoHandler());
              connector.setCodecFactory(
          new StringCodecFactory());
             
          try {
                      connector.Connect(
          new InetSocketAddress("localhost"8080));
              } 
          catch (IOExceptione) {
                      e.printStackTrace();
              }

              注意,connect方法并不阻塞,而是立即返回,連接是否建立可以通過TCPConnectorController.isConnected()方法來判斷,因此通常你可能會這樣使用:

          try {
                      connector.Connect(
          new InetSocketAddress("localhost"8080));
                      
          while(!connector.isConnected())
                          ;
                  } 
          catch (Exception e) {
                      e.printStackTrace();
                  }

              來強制確保后面對connector的使用是已經(jīng)連接上的connector,然而更好的做法是在Handler的onConnected()回調(diào)方法中處理邏輯,因為這個方法僅僅在連接建立后才會被調(diào)用。
              兩個ConnectorController都有系列send方法,用于發(fā)送數(shù)據(jù):
          TCPConnectorController.send(Object msg) throws InterruptedException
          UDPConnectorController.send(DatagramPacket packet) 
          throws InterruptedException
          UDPConnectorController.send(SocketAddress targetAddr, Object msg)
          throws InterruptedException


              0.50-beta2帶來的另一個修改就是Session接口添加setReadBufferByteOrder方法,用于設(shè)置session接收緩沖區(qū)的字節(jié)序,默認(rèn)是網(wǎng)絡(luò)字節(jié)序,也就是大端法。這個方法建議在Handler的onSessionStarted回調(diào)方法中調(diào)用。

              在0.50-beta最重要的修改是引入了session發(fā)送隊列緩沖區(qū)的流量控制選項。默認(rèn)情況下,session的發(fā)送緩沖隊列是無界的,隊列的push和pop也全然不會阻塞。在設(shè)置了緩沖隊列的高低水位選項后即引入了發(fā)送流量控制,規(guī)則如下:
          a)當(dāng)發(fā)送隊列中的數(shù)據(jù)總量大于高水位標(biāo)記(highWaterMark),Session.send將阻塞
          b)在條件a的作用下,Session.send的阻塞將持續(xù)到發(fā)送隊列中的數(shù)據(jù)總量小于于低水位標(biāo)記(lowWaterMark)才解除。


          緩沖隊列高低水位的設(shè)置通過Controller的下列方法設(shè)置:
               public void setSessionWriteQueueHighWaterMark(int highWaterMark);

               
          public void setSessionWriteQueueLowWaterMark(int lowWaterMark);
           
          緩沖隊列的流量控制想法來自ACE的ACE_Message_Queue,是通過com.google.code.yanf4j.util.MessageQueue類實現(xiàn)的。

             0.50-beta還引入了Session.send(Object msg)的重載版本 Session.send(Object msg,long timeout),在超過timeout時間后send仍然阻塞時即終止send。注意,現(xiàn)在Session.send的這兩個方法都返回一個bool值來表示send成功與否,并且都將響應(yīng)中斷(僅限啟動了流量控制選項)拋出InterruptedException。

          評論

          # re: yanf4j引入了客戶端非阻塞API  回復(fù)  更多評論   

          2009-02-19 09:40 by Arbow
          呃。。。工作不僅僅要用到ACE,連Java也有啊?

          # re: yanf4j引入了客戶端非阻塞API  回復(fù)  更多評論   

          2009-02-19 09:46 by dennis
          @Arbow
          用java寫測試客戶端,畢竟比較熟悉
          主站蜘蛛池模板: 凤冈县| 遵义县| 峡江县| 岑溪市| 泗水县| 宁海县| 民县| 张家口市| 郁南县| 渝中区| 桐乡市| 台北市| 讷河市| 塔河县| 安泽县| 阿鲁科尔沁旗| 德州市| 芦溪县| 平武县| 泗洪县| 朝阳市| 金沙县| 开阳县| 富源县| 响水县| 镇坪县| 邓州市| 吉木乃县| 湘潭市| 屯门区| 延吉市| 遵义县| 永靖县| 新巴尔虎左旗| 巫山县| 贵南县| 保亭| 建湖县| 如东县| 广昌县| 北安市|