I want to fly higher
          programming Explorer
          posts - 114,comments - 263,trackbacks - 0

          1.Future模式是一種常見的多線程設(shè)計模式,用來實現(xiàn)'異步'.其模式的核心在于去除main中的等待時間并使得原本需要等待的時間段可以用于處理其他的業(yè)務(wù)邏輯,從而充分利用計算機(jī)資源.

          2.調(diào)用方式example:
                      ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", port));
                      future.awaitUninterruptibly();
                      session = future.getSession();

          3.相關(guān)類圖

              

          4.源碼_IoFuture

          /**
           * 表示一個異步io操作的completion.
           * 可通過IoFutureListener監(jiān)聽completion
           
          */

          public interface IoFuture {
              
          /**
               * 返回該future關(guān)聯(lián)的session
               
          */

              IoSession getSession();

              
          /**
               * 等待異步操作完成
               * 關(guān)聯(lián)的listener將被通知完成
               
          */

              IoFuture await() 
          throws InterruptedException;

              
          /**
               * 等待異步操作完成并指定超時時間
               *
               * 
          @return <tt>true</tt> if the operation is completed.
               
          */

              
          boolean await(long timeout, TimeUnit unit) throws InterruptedException;

              
          /**
               * 等待異步操作完成并指定超時時間(毫秒)
               *
               * 
          @return <tt>true</tt> if the operation is completed.
               
          */

              
          boolean await(long timeoutMillis) throws InterruptedException;

              
          /**
               * 等待異步操作完成且不可中斷
               * 關(guān)聯(lián)的listener將被通知完成
               * 
               * 
          @return the current IoFuture
               
          */

              IoFuture awaitUninterruptibly();

              
          /**
               * 等待異步操作完成并指定超時時間
               * 不可中斷
               *
               * 
          @return <tt>true</tt> if the operation is completed.
               
          */

              
          boolean awaitUninterruptibly(long timeout, TimeUnit unit);

              
          /**
               * 等待異步操作完成并指定超時時間(毫秒)
               * 不可中斷
               *
               * 
          @return <tt>true</tt> if the operation is finished.
               
          */

              
          boolean awaitUninterruptibly(long timeoutMillis);

              
          /**
               * 已廢棄. 替換方法為 {
          @link #awaitUninterruptibly()}.
               
          */

              @Deprecated
              
          void join();

              
          /**
               * 已廢棄. 替換方法為 {
          @link #awaitUninterruptibly(long)}.
               
          */

              @Deprecated
              
          boolean join(long timeoutMillis);

              
          /**
               * 判斷異步操作是否完成
               
          */

              
          boolean isDone();

              
          /**
               * 添加一個監(jiān)聽Future事件完成的listener
               
          */

              IoFuture addListener(IoFutureListener
          <?> listener);

              
          /**
               * 移除一個已存在的FutureListener
               
          */

              IoFuture removeListener(IoFutureListener
          <?> listener);
          }

              1.從源碼看,其提供了三類await方法

                    1.await,一直等待至操作完成
                    2.await(long),在指定的超時時間范圍內(nèi)等待
                    3.awaitUninterruptibly,等待且不可中斷

               2.對外提供了FutureListener.

          5.源碼_DefaultIoFuture

               1.其實現(xiàn)了接口IoFuture,并提供了默認(rèn)實現(xiàn).

               2.其內(nèi)部有一個lock:
                   private final Object lock;//該lock用于await方法

                   該lock在DefaultIoFuture構(gòu)造的時候初始化為this.

                     public DefaultIoFuture(IoSession session) {
                        this.session = session;
                       this.lock = this;
                       }

               3.await()方法實現(xiàn)

          public IoFuture await() throws InterruptedException {
                   
          synchronized (lock) {
                       
          while (!ready) {
                           waiters
          ++;
                           
          try {
                               
          // 等待notify
                               
          // 假定尋在死鎖
                               
          // 循環(huán)檢查潛在的死鎖
                               lock.wait(DEAD_LOCK_CHECK_INTERVAL);
                           }
           finally {
                               waiters
          --;
                               
          if (!ready) {
                                   checkDeadLock();
                               }

                                }

                           }

                          }

                      
          return this;
              }


                  1.ready表示異步操作是否完成
          {@link isDone()}
             

          public boolean isDone() {
                      
          synchronized (lock) {
                         
          return ready;
                      }

                 }

           
                   2.DEAD_LOCK_CHECK_INTERVAL為檢查死鎖的周期時間,為5000L,即5s.
                   3.waiters表示當(dāng)前正在wait的的線程數(shù)目,如果大于0,則表示有線程正在阻塞在wait方法.所以可以用這個變量判斷是否notifyAll.

                   4.checkDeadLock()為檢查死鎖的方法:
                     其核心思路是檢查當(dāng)前調(diào)用線程的StackTrace,如果此時調(diào)用await的線程是I/O processor thread,則表示死鎖發(fā)生(詳見死鎖的發(fā)生條件),則拋出IllegalStateException.

                   5.整體代碼邏輯是在異步操作未完成以前循環(huán)的wait(5000L)并在超時后檢查死鎖直到ready.

               4.await0()方法實現(xiàn)

              private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
                             
          long endTime = System.currentTimeMillis() + timeoutMillis;

                             
          if (endTime < 0) {
                                  endTime
          = Long.MAX_VALUE;
                              }


                         
          synchronized (lock) {
                             
          if (ready) {
                                 
          return ready;
                              }
          else if (timeoutMillis <= 0) {
                     
          // 這里如果timeoutMillis小于等于0,則直接返回ready flag
                                  return ready;
                              }


                              waiters
          ++;

                             
          try {
                       
          // 用一個無限循環(huán)來實現(xiàn)awaitUninterruptibly
                                  for (;;) {
                                     
          try {
                                         
          long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
                                          lock.wait(timeOut);
                                      }
          catch (InterruptedException e) {
                         
          // 這里說明當(dāng)前線程在wait的時候被中斷了(調(diào)用了interrupt).如果可被中斷則繼續(xù)向上層拋出InterruptedException.否則catch住不做任何操作,繼續(xù)執(zhí)行                //interruptable為false的時候表示Uninterruptibly.
                                          if (interruptable) {
                           
          // 可中斷則直接向上層拋出異常,則整個方法調(diào)用就結(jié)束了.反之則會繼續(xù)執(zhí)行catch后代碼
                                              throw e;
                                          }

                                      }


                                     
          if (ready) {
                                         
          return true;
                                      }

                     
                            
          // 該判斷主要用來判斷超時
                                      if (endTime < System.currentTimeMillis()) {
                                         
          return ready;
                                      }

                                  }

                              }
          finally {
                                  waiters
          --;
                                 
          if (!ready) {
                                      checkDeadLock();
                                  }

                              }

                          }

                      }

                    1.該方法是內(nèi)部的一個private方法.其中第一個參數(shù)指定超時時間,第二個參數(shù)指明在wait的時候是否可被中斷.
                     2.await(long timeout, TimeUnit unit)/await(long timeoutMillis)/awaitUninterruptibly的實現(xiàn)最終都調(diào)用了await0.
                     3.整體代碼的實現(xiàn)即用一個無限循環(huán)+捕捉中斷異常來實現(xiàn)awaitUninterruptibly.用endTime來實現(xiàn)超時.

                    4.仍然進(jìn)行了潛在死鎖的檢查.
                      另外有一個問題是每次wait的時間都是Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL).如果timeoutMillis為6s,則實際wait的時間是5 + 5  = 10s.不知道作者的這個設(shè)計是什么.

                    5.await(long timeoutMillis)調(diào)用了await0(timeoutMillis, true)/awaitUninterruptibly()調(diào)用了await0(Long.MAX_VALUE, false).

               5.setValue()方法實現(xiàn)

          public void setValue(Object newValue) {
                   
          synchronized (lock) {
                       
          // 僅一次
                        if (ready) {
                           
          return;
                        }

            
            
          // 設(shè)置操作結(jié)果
                        result = newValue;
            
          // 操作完成
                        ready = true;
                       
          if (waiters > 0) {
               
          // 喚醒a(bǔ)wait的線程
                            lock.notifyAll();
                        }

                    }


                  
          // 通知監(jiān)聽器操作完成
                    notifyListeners();
                }

              
                    這個方法是設(shè)置異步操作的結(jié)果表示操作完成.

          6.源碼_IoFutureListener

          /**
           * 泛型接口,監(jiān)聽異步io操作完成
          */

          1.public interface IoFutureListener<extends IoFuture> extends EventListener {
              
          /**
               *  自實現(xiàn)的一個CLOSE_LISTENER.用于操作完成執(zhí)行關(guān)閉session
               
          */

              
          static IoFutureListener<IoFuture> CLOSE = new IoFutureListener<IoFuture>() {
                  
          public void operationComplete(IoFuture future) {
                      future.getSession().close(
          true);
                  }

              }
          ;

              
          /** 
               * 操作完成時的回調(diào)接口
               
          */

              
          void operationComplete(F future);
          }


              1.DefaultIoFuture的源碼中提供了添
          加/移除/通知的方法.不過有一個比較疑惑的地方是:其提供了一個firstListener和一個otherListeners,不知道為什么有這樣的需求.
              2.DefaultIoFuture#addListener中對是否ready做了判斷.只要ready就會通知.也就是在操作已經(jīng)完成的情況下添加一個監(jiān)聽器,則也會執(zhí)行回調(diào)方法.


          7.
          when i/o operation complete?

               1.ConnectFuture
                {@link TailFilter#sessionCreated},調(diào)用了future.setSession(session)->調(diào)用setValue.->表示異步connect完成.
               2.CloseFuture
                {@link DefaultIoFilterChain#fireSessionClosed},調(diào)用了session.getCloseFuture().setClosed()->表示異步關(guān)閉完成
               3.WriteFuture
                {@link DefaultIoFilterChain#fireMessageSent},調(diào)用了request.getFuture().setWritten()>表示異步寫完成
               4.ReadFuture
                {@link TailFilter#messageReceived},如果isUseReadOperation{@link IoSessionConfig},則執(zhí)行setRead(Object message)->表示異步讀完成.

          8.總結(jié)
              本篇介紹了mina內(nèi)部異步的實現(xiàn)方式Future.著重介紹了await/awaitUninterruptly的實現(xiàn)方法等.

          posted on 2013-11-28 17:50 landon 閱讀(2261) 評論(1)  編輯  收藏 所屬分類: Sources

          FeedBack:
          # re: apache-mina-2.07源碼筆記3-Future
          2013-11-29 15:38 | 馮磊
          看著不錯 感謝分享  回復(fù)  更多評論
            
          主站蜘蛛池模板: 绍兴县| 凤翔县| 满城县| 新郑市| 漠河县| 安吉县| 金坛市| 抚顺市| 鄂托克前旗| 汉寿县| 安远县| 广河县| 九龙城区| 双牌县| 云浮市| 榆社县| 诸城市| 全州县| 新泰市| 黄冈市| 桓台县| 天台县| 肇源县| 沙河市| 安多县| 武冈市| 招远市| 伊宁市| 资兴市| 繁峙县| 达日县| 济阳县| 清丰县| 运城市| 景德镇市| 三亚市| 玉山县| 成都市| 尼玛县| 特克斯县| 怀宁县|