瘋狂

          STANDING ON THE SHOULDERS OF GIANTS
          posts - 481, comments - 486, trackbacks - 0, articles - 1
            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

          xsocket源碼解讀

          Posted on 2011-10-19 17:34 瘋狂 閱讀(6333) 評論(3)  編輯  收藏 所屬分類: 網絡通訊 、讀代碼

          關于xsocket可見于 我的另外一篇文章http://www.aygfsteel.com/freeman1984/archive/2011/04/25/302706.html,或者查看官網http://xsocket.org/
          當然閱讀xsocket需要一些線程,nio,niosocket,和java.util.concurrent(鎖,線程池等)包的一些知識。要不讀起來很費勁,建議先去了解下這些知識??梢栽谖业奈恼路诸恈oncurrent里面有一些,其他去網上找找。
          本文只讀了一個主要的流程,對于一些其他的代碼例如:ssl相關,讀數據相關沒有涉及,看有時間能補上。
          首先xsocket的幾個關鍵的類
          Server: 服務器端初始化線程池創建IoAcceptor
          IoAcceptor:采用while循環接收客戶端連接,并創建IoSocketDispatcher和IoChainableHandler
          IoSocketDispatcher:負責注冊SelectionKey以及事件的分發,并交給IoChainableHandler處理,通過一個while循環來處理注冊的SelectionKey事件。
          IHandler:事件處理,數據的讀寫等等。
          INonBlockingConnection客戶端接口。
          (1)首先看下Server創建:
          構造方法

          protected Server(InetSocketAddress address, Map<String, Object> options, IHandler handler, SSLContext sslContext,
           
          boolean sslOn, int backlog, int minPoolsize, int maxPoolsize, int taskqueueSize) 
          這個方法主要是初始化線程池,構件acceptor
                  defaultWorkerPool 
          = new WorkerPool(minPoolsize, maxPoolsize, taskqueueSize);
               workerpool 
          = defaultWorkerPool;
               
            
          if (sslContext != null{//是否使用ssl
             acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options, sslContext, sslOn);
             
            }
           else {
             acceptor 
          = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options);
            }



          其中線程池:使用jdk1.5以后的ThreadPoolExecutor,線程池最小默認2,最大100,QUEUE的大小默認也是100
          線程池最?。篗IN_SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolMinSize", "2"));
          SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolSize", "100"));
          TASK_QUEUE_SIZE = Integer.parseInt(System.getProperty("org.xsocket.connection.server.taskqueuesize", Integer.toString(SIZE_WORKER_POOL)));

          (2)構件acceptor細節,最后server啟動的時候會啟動acceptor監聽客戶端

           public IoAcceptor(IIoAcceptorCallback callback, InetSocketAddress address, int backlog, SSLContext sslContext, boolean sslOn, boolean isReuseAddress) throws IOException {
                  .
                  serverChannel 
          = ServerSocketChannel.open();
                  
                  serverChannel.configureBlocking(
          true);
                  serverChannel.socket().setSoTimeout(
          0);  // accept method never times out
                  serverChannel.socket().setReuseAddress(isReuseAddress); 
                  .
                  serverChannel.socket().bind(address, backlog);
                  dispatcherPool 
          = new IoSocketDispatcherPool("Srv" + getLocalPort(), IoProvider.getServerDispatcherInitialSize());
                  .
              }


          (3)啟動server
          server.start();
          服務的啟動用了一個單獨的線程,這里面使用到了CountDownLatch可參見另外一篇關于CountDownLatch用法的文章:
          http://www.aygfsteel.com/freeman1984/archive/2011/07/04/353654.html
          使用CountDownLatch來控制server的啟動時間,操作多少時間為啟動就,默認是60秒,這里就不講CountDownLatch的代碼了
          整個啟動的方法如下,能看懂CountDownLatch的用法基本上就理解了。

          public static void start(IServer server, int timeoutSec) throws SocketTimeoutException {
            
            
            
          // start server within a dedicated thread 
            Thread t = new Thread(server);
            t.setName(
          "xServer");
            t.start();
          //請看下面的run方法分析
            
           }


          看看他的server線程的run方法:

          public void run() {
           
           acceptor.listen();
          //啟動前面創建的acceptor開始監聽客戶端連接
           
          }



          接著查看listen()方法:

          public void listen() throws IOException {
               callback.onConnected();
          //通知server已經啟動
               accept();//接受客戶端連接
              }

          }



          查看 accept();很明了,使用一個while循環監聽客戶端連接,并建立可客戶端相關的處理類:

           

          while (isOpen.get()) {

              
          // blocking accept call
              SocketChannel channel = serverChannel.accept();

              
          // create IoSocketHandler
              
          //創建事件分發器
              IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();
              
          //創建io處理
              IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel, sslContext, sslOn);
              
          // notify call back
              callback.onConnectionAccepted(ioHandler);//很關鍵的一個地方,會注冊SelectionKey.OP_READ,此時客戶端發來的消息就可以北服務端獲取
          }


          查看callback.onConnectionAccepted(ioHandler);
          此方法會初始化server端的ioHandler,查看初始化的代碼:
          dispatcher.register(this, SelectionKey.OP_READ);首先會注冊read選擇器,
          如果有read事件發生dispatcher就會處理??纯磀ispatcher的run方法(通過一個循環來不停的處理已經注冊的事件)

          while(isOpen.get()) {
              
              
          int eventCount = selector.select(5000); 
             
              handledTasks 
          = performRegisterHandlerTasks();//處理事件
              handledTasks += performKeyUpdateTasks();
              
          if (eventCount > 0{
               handleReadWriteKeys();
          //處理讀寫,調用我們自己定義的hander來處理onData等事件
              }

              handledTasks 
          += performDeregisterHandlerTasks();
                
            }



            
          (4)客戶端
          NonBlockingConnection,例如:
          new NonBlockingConnection("localhost", 8090,new MyHandler() )
          構造方法的主要代碼:

           .
           SocketChannel channel 
          = openSocket(localAddress, options);//實際調用:SocketChannel channel = SocketChannel.open();
             
           IoConnector connector 
          = getDefaultConnector();
                 
              IIoConnectorCallback callback 
          = new AsyncIoConnectorCallback(remoteAddress, channel, sslContext, isSecured, connectTimeoutMillis);
              connector.connectAsync(channel, remoteAddress, connectTimeoutMillis, callback);
                
          }



          建立連接,生成IoConnector用來管理連接,然后connector開始啟動,做一些初始化的工作:

          其中connector.connectAsync(…方法會執行會產生一個RegisterTask任務到IoConnector,這個RegisterTask做的事情如下:
          selectionKey = channel.register(selector, SelectionKey.OP_CONNECT);,也就是注冊SelectionKey.OP_CONNECT
          當IoConnector運行會執行這個任務:
          看下他的run方法主要代碼:

          while(isOpen.get()) {
                     
                          handledTasks 
          = performTaskQueue();//首先運行上一步創建的RegisterTask注冊SelectionKey.OP_CONNECT
                          int eventCount = selector.select(1000);//查看SelectionKey.OP_CONNECT事件是否已經準備好
                          if (eventCount > 0{
                              handleConnect();
          //如果準備好就處理連接事件
                          }
           else {
                           checkForLooping(handledTasks);
                          }

                   
            }



           handleConnect()的代碼:
           

           private void handleConnect() {
                  Set
          <SelectionKey> selectedEventKeys = selector.selectedKeys();

                  Iterator
          <SelectionKey> it = selectedEventKeys.iterator();
                  
          while (it.hasNext()) {
                      SelectionKey eventKey 
          = it.next();
                      it.remove();

                      RegisterTask registerTask 
          = (RegisterTask) eventKey.attachment();

                      
          if (eventKey.isValid() && eventKey.isConnectable()) {
                          
                          
          try {
                              
          boolean isConnected = ((SocketChannel) eventKey.channel()).finishConnect();//已經通訊連接
                              if (isConnected) {
                               eventKey.cancel();
                               registerTask.callback.onConnectionEstablished();
          //連接建立好就做下一步工作,注冊read事件。
                              }

                          
                      }

                  }

              }


           接著看registerTask.callback.onConnectionEstablished()
           主要初始化iohander并注冊read事件

           private void init(IoChainableHandler ioHandler, IIoHandlerCallback handlerCallback) throws IOException, SocketTimeoutException {
            
          this.ioHandler = ioHandler;
            ioHandler.init(handlerCallback);
          //這個方法里面注冊了read
            isConnected.set(true);//這個時候通訊連接才真正建立起來了
           }



          繼續看ioHandler.init(handlerCallback)方法:

          public void init(IIoHandlerCallback callbackHandler) throws IOException, SocketTimeoutException {
               
            dispatcher.register(
          this, SelectionKey.OP_READ);//注冊SelectionKey.OP_READ時間,可以接受服務端的消息了
          }



          服務端和客戶端就可以互相通信了。本文大致講解了xsocket的代碼的流程,其中講解有誤的地方請兄弟們指出,多謝!


          評論

          # re: xsocket源碼解讀  回復  更多評論   

          2011-10-20 08:32 by tbw
          不錯 不錯 學習了

          # re: xsocket源碼解讀  回復  更多評論   

          2012-03-05 16:18 by Miao
          請問,xsocket怎么連接代理呢??實在找不到相關資料....謝謝..

          # re: xsocket源碼解讀  回復  更多評論   

          2014-11-21 18:51 by xsank
          有幫助,寫樓主分享
          主站蜘蛛池模板: 且末县| 陆丰市| 临泉县| 蓬莱市| 淄博市| 龙口市| 陕西省| 肥城市| 临夏县| 乾安县| 丰城市| 宁远县| 合江县| 德令哈市| 新巴尔虎左旗| 德格县| 日土县| 广西| 兴隆县| 霍邱县| 北安市| 瓦房店市| 荣成市| 开鲁县| 邮箱| 武安市| 昭通市| 许昌市| 德惠市| 美姑县| 冀州市| 宣恩县| 黔东| 建始县| 剑河县| 拜泉县| 漯河市| 宁乡县| 琼结县| 比如县| 西和县|