我們談一下實際的場景吧。我們在開發中,有如下場景

          a) 關閉空閑連接。服務器中,有很多客戶端的連接,空閑一段時間之后需要關閉之。
          b) 緩存。緩存中的對象,超過了空閑時間,需要從緩存中移出。
          c) 任務超時處理。在網絡協議滑動窗口請求應答式交互時,處理超時未響應的請求。

          一種笨笨的辦法就是,使用一個后臺線程,遍歷所有對象,挨個檢查。這種笨笨的辦法簡單好用,但是對象數量過多時,可能存在性能問題,檢查間隔時間不好設置,間隔時間過大,影響精確度,多小則存在效率問題。而且做不到按超時的時間順序處理。

          這場景,使用DelayQueue最適合了。

          DelayQueue 是java.util.concurrent中提供的一個很有意思的類。很巧妙,非常棒!但是java doc和Java SE 5.0的source中都沒有提供Sample。我最初在閱讀ScheduledThreadPoolExecutor源碼時,發現DelayQueue 的妙用。隨后在實際工作中,應用在session超時管理,網絡應答通訊協議的請求超時處理。

          本文將會對DelayQueue做一個介紹,然后列舉應用場景。并且提供一個Delayed接口的實現和Sample代碼。

          DelayQueue是一個BlockingQueue,其特化的參數是Delayed。(不了解BlockingQueue的同學,先去了解BlockingQueue再看本文)
          Delayed擴展了Comparable接口,比較的基準為延時的時間值,Delayed接口的實現類getDelay的返回值應為固定值(final)。DelayQueue內部是使用PriorityQueue實現的。

          DelayQueue = BlockingQueue + PriorityQueue + Delayed

          DelayQueue的關鍵元素BlockingQueue、PriorityQueue、Delayed。可以這么說,DelayQueue是一個使用優先隊列(PriorityQueue)實現的BlockingQueue,優先隊列的比較基準值是時間。

          他們的基本定義如下
          public interface Comparable<T> {
              
          public int compareTo(T o);
          }

          public interface Delayed extends Comparable<Delayed> {
              
          long getDelay(TimeUnit unit);
          }

          public class DelayQueue<extends Delayed> implements BlockingQueue<E> { 
              
          private final PriorityQueue<E> q = new PriorityQueue<E>();
          }

          DelayQueue內部的實現使用了一個優先隊列。當調用DelayQueue的offer方法時,把Delayed對象加入到優先隊列q中。如下:
          public boolean offer(E e) {
              
          final ReentrantLock lock = this.lock;
              lock.lock();
              
          try {
                  E first 
          = q.peek();
                  q.offer(e);
                  
          if (first == null || e.compareTo(first) < 0)
                      available.signalAll();
                  
          return true;
              } 
          finally {
                  lock.unlock();
              }
          }

          DelayQueue的take方法,把優先隊列q的first拿出來(peek),如果沒有達到延時閥值,則進行await處理。如下:
          public E take() throws InterruptedException {
              
          final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              
          try {
                  
          for (;;) {
                      E first 
          = q.peek();
                      
          if (first == null) {
                          available.await();
                      } 
          else {
                          
          long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                          
          if (delay > 0) {
                              
          long tl = available.awaitNanos(delay);
                          } 
          else {
                              E x 
          = q.poll();
                              
          assert x != null;
                              
          if (q.size() != 0)
                                  available.signalAll(); 
          // wake up other takers
                              return x;

                          }
                      }
                  }
              } 
          finally {
                  lock.unlock();
              }
          }

          -------------------

          以下是Sample,是一個緩存的簡單實現。共包括三個類Pair、DelayItem、Cache。如下:

          public class Pair<K, V> {
              
          public K first;

              
          public V second;
              
              
          public Pair() {}
              
              
          public Pair(K first, V second) {
                  
          this.first = first;
                  
          this.second = second;
              }
          }

          --------------
          以下是Delayed的實現
          import java.util.concurrent.Delayed;
          import java.util.concurrent.TimeUnit;
          import java.util.concurrent.atomic.AtomicLong;

          public class DelayItem<T> implements Delayed {
              
          /** Base of nanosecond timings, to avoid wrapping */
              
          private static final long NANO_ORIGIN = System.nanoTime();

              
          /**
               * Returns nanosecond time offset by origin
               
          */
              
          final static long now() {
                  
          return System.nanoTime() - NANO_ORIGIN;
              }

              
          /**
               * Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied
               * entries.
               
          */
              
          private static final AtomicLong sequencer = new AtomicLong(0);

              
          /** Sequence number to break ties FIFO */
              
          private final long sequenceNumber;

              
          /** The time the task is enabled to execute in nanoTime units */
              
          private final long time;

              
          private final T item;

              
          public DelayItem(T submit, long timeout) {
                  
          this.time = now() + timeout;
                  
          this.item = submit;
                  
          this.sequenceNumber = sequencer.getAndIncrement();
              }

              
          public T getItem() {
                  
          return this.item;
              }

              
          public long getDelay(TimeUnit unit) {
                  
          long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
                  
          return d;
              }

              
          public int compareTo(Delayed other) {
                  
          if (other == this// compare zero ONLY if same object
                      return 0;
                  
          if (other instanceof DelayItem) {
                      DelayItem x 
          = (DelayItem) other;
                      
          long diff = time - x.time;
                      
          if (diff < 0)
                          
          return -1;
                      
          else if (diff > 0)
                          
          return 1;
                      
          else if (sequenceNumber < x.sequenceNumber)
                          
          return -1;
                      
          else
                          
          return 1;
                  }
                  
          long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
                  
          return (d == 0? 0 : ((d < 0? -1 : 1);
              }
          }



          以下是Cache的實現,包括了put和get方法,還包括了可執行的main函數。
          import java.util.concurrent.ConcurrentHashMap;
          import java.util.concurrent.ConcurrentMap;
          import java.util.concurrent.DelayQueue;
          import java.util.concurrent.TimeUnit;
          import java.util.logging.Level;
          import java.util.logging.Logger;

          public class Cache<K, V> {
              
          private static final Logger LOG = Logger.getLogger(Cache.class.getName());

              
          private ConcurrentMap<K, V> cacheObjMap = new ConcurrentHashMap<K, V>();

              
          private DelayQueue<DelayItem<Pair<K, V>>> q = new DelayQueue<DelayItem<Pair<K, V>>>();

              
          private Thread daemonThread;

              
          public Cache() {

                  Runnable daemonTask 
          = new Runnable() {
                      
          public void run() {
                          daemonCheck();
                      }
                  };

                  daemonThread 
          = new Thread(daemonTask);
                  daemonThread.setDaemon(
          true);
                  daemonThread.setName(
          "Cache Daemon");
                  daemonThread.start();
              }

              
          private void daemonCheck() {

                  
          if (LOG.isLoggable(Level.INFO))
                      LOG.info(
          "cache service started.");

                  
          for (;;) {
                      
          try {
                          DelayItem
          <Pair<K, V>> delayItem = q.take();
                          
          if (delayItem != null) {
                              
          // 超時對象處理
                              Pair<K, V> pair = delayItem.getItem();
                              cacheObjMap.remove(pair.first, pair.second); 
          // compare and remove
                          }
                      } 
          catch (InterruptedException e) {
                          
          if (LOG.isLoggable(Level.SEVERE))
                              LOG.log(Level.SEVERE, e.getMessage(), e);
                          
          break;
                      }
                  }

                  
          if (LOG.isLoggable(Level.INFO))
                      LOG.info(
          "cache service stopped.");
              }

              
          // 添加緩存對象
              public void put(K key, V value, long time, TimeUnit unit) {
                  V oldValue 
          = cacheObjMap.put(key, value);
                  
          if (oldValue != null)
                      q.remove(key);

                  
          long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);
                  q.put(
          new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), nanoTime));
              }

              
          public V get(K key) {
                  
          return cacheObjMap.get(key);
              }

              
          // 測試入口函數
              public static void main(String[] args) throws Exception {
                  Cache
          <Integer, String> cache = new Cache<Integer, String>();
                  cache.put(
          1"aaaa"3, TimeUnit.SECONDS);

                  Thread.sleep(
          1000 * 2);
                  {
                      String str 
          = cache.get(1);
                      System.out.println(str);
                  }

                  Thread.sleep(
          1000 * 2);
                  {
                      String str 
          = cache.get(1);
                      System.out.println(str);
                  }
              }
          }

          運行Sample,main函數執行的結果是輸出兩行,第一行為aaa,第二行為null。
          posted on 2007-04-27 20:04 溫少的日志 閱讀(2022) 評論(2)  編輯  收藏
          Comments
          • # re: 精巧好用的DelayQueue
            溫少的日志
            Posted @ 2007-04-29 22:52
            原來文章中有DelayItem的,昨天加入一些內容時,不小心把DelayItem部分的代碼刪除了。現已經補上,請看正文。  回復  更多評論   
          • # re: 精巧好用的DelayQueue
            chun
            Posted @ 2012-03-11 14:24
            Cache Demo 有個小小的Bug。設置為 daemon 的 Thread無法停止。
            我加了一個變量 threadRunning = true; while(threadRunning)  回復  更多評論   

          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
           
          主站蜘蛛池模板: 郯城县| 古交市| 来安县| 灵丘县| 邻水| 赣州市| 织金县| 兴海县| 滦平县| 平顺县| 尼木县| 田林县| 柳江县| 右玉县| 隆昌县| 丰镇市| 靖远县| 长沙市| 合水县| 佳木斯市| 报价| 蒙自县| 新巴尔虎右旗| 永昌县| 和林格尔县| 库伦旗| 旌德县| 全南县| 崇阳县| 禹州市| 剑阁县| 和平县| 获嘉县| 塔河县| 凌源市| 富宁县| 新野县| 原平市| 陇南市| 舞钢市| 固原市|