posts - 9, comments - 4, trackbacks - 0, articles - 21

          生產者/消費者模型

          Posted on 2007-10-27 16:23 一步一步努力向上爬 閱讀(3552) 評論(0)  編輯  收藏 所屬分類: 設計模式
          生產者/消費者模型是最基本的并發協作模型,是所有并發協作的基礎。可以這么說,其他的并發協作都是供求關系模型的變種。生產者,消費者之間的供求關系可以簡單的 使用管道來構造。讓我們看兩者之間的行為模式: *生產/消費模型:消費者如果無消費對象,就會阻塞直到有消費對象到達;一個消費對象僅供一個消費者消費。 *BlockingQueue: 如果隊列為空,則讀取操作將會阻塞直至隊列有新的內容到達;隊列中對象一旦被讀取,將從隊列中移走。 由此可見,阻塞隊列天然符合生產/消費模型的供求行為模式。在前面展示condition的用法的時候,曾經 用過生產者/消費者模型來舉例。那個例子如果改用BlockingQueue來寫的話就十分簡單
              ...
          BlockingQueue<String> q =new ArrayBlockingQueue<String> (10);
          ...
          public void supply () {
          q.put("product by "+Thread.currentThread().getId()+":"+(++productNo));
          }
          ...
          public void cunsume () {
          String product =q.take();
          System.out.println("consume product:"+product);
          }
          從BlockingQueue也可以看出,它和UNIX系統下面的Pipe十分相似。所不同的不過是兩點,首先,pipe是進程間的,命名管道甚至可以在非親緣進程間使用,而BlockingQueue 目前只是線程間的通信手段。當然,由于java本身強大的動態類裝載功能,這個缺陷對java程序之間的溝通限制并不大。其次,pipe是基于字節流的,而BlockingQueue是 基于對象的,這使得BlockingQueue更加易用,不過卻讓BlockingQueue綁定了Java語言,使進一步成為輕量級本地進程通信工具的難度增大。

          從前面對生產/消費模型的行為方式可以看出,生產/消費模型著重于規范消費者的行為模式,當消費速度超過生產速度的時候,消費者就會被阻塞。而對于生產者的行為則沒有 規定。當生產速度超過消費速度,生產者的行為模式可以分為以下幾種: #當積壓的產品達到一定數量時,生產者被阻塞 #無論有多少積壓產品,生產者都不會被阻塞 #不能有任何積壓產品,生產者在當前產品未被消費之前,會被阻塞 對于產品來說,也有不同的行為模式 #產品只有在被生產出來一段時間之后才能被消費(先花點時間晾晾干?) #不同類別的產品被消費的優先級不同(有鉆石的話,黃金就先放一邊吧:))

          根據生產者行為模式的不同Concurrent包提供了不同的BlockingQueue的實現 ||Queue種類||行為描述 |ArrayBlockingQueue|有限制的blocking queue,積壓的產品不得超過制訂數量 |DelayQueue|產品只有生產出一段時間之后,才能被消費,無限制的積壓產品 |LinkedBlockingQueue|同時支持有限制的blocking queue,也能支持無限制的積壓產品(數量不能超過Integer.MAX_VALUE) |PriorityBlockingQueue|不同產品的被消費優先級不同,無限制的積壓產品 |SynchronousQueue|不允許積壓產品

          這些不同的行為模式中,較為常見的除了ArrayBlockingQueue和LinkedBlockingQueue之外,PriorityBlockingQueue也非常重要。舉例來說,如果我們利用BlockingQueue 來實現一個郵件系統(著名的qmail就是利用pipe技術構建的核心架構)。我們知道郵件有不同的級別,如果當前隊列里有加急郵件需要處理的話,系統將優先處理加急郵件。 我們將以郵件傳遞為例子,說明PriorityBlockingQueue的使用方法。(注:這里的這個郵件模型只是一個非常簡陋的模型,用來說明PriorityBlockingQueue的使用方法而已, 和實際應用有很大的差距)

          首先,我們需要了解郵件傳遞過程的基本模型。在這個簡單的郵件傳送模型中涉及到下列概念 *MDA: Mail Deliver Agent, 負責接受指定用戶的郵件。 *MTA: Mail Transfer Agent, 負責接受遠程傳送過來的郵件,并將其傳送給收件人的MDA 它們和郵件用戶之間的關系如下圖

          其中MTA使用Queue傳送郵件給MDA。因此,不同的用戶會使用不同的Mail Queue。下面是MailQueue的代碼
          public class MailQueue<E> extends PriorityBlockingQueue<E>{
          public E take () throws InterruptedException {
          E ren =super.take();
          Utils._log("take:"+ren);
          return ren;
          }

          public void put (E o) {
          super.put(o);
          Utils._log("put:"+o);
          }
          }
          為了能夠根據收件人的Mail Address找到相應的Mail Queue, 使用一個MailQueueFactory來產生MailQueue
          public class MailQueueFactory {
          //A ConcurrentHashMap is used here instead of Hashtable
          static ConcurrentHashMap<MailAccount,MailQueue<Mail>> mailQueues =
          new ConcurrentHashMap<MailAccount,MailQueue<Mail>>();
          public static BlockingQueue<Mail> getMailQueue (MailDeliverer e) {
          return getMailQueue(e.getMailAccount());
          }

          public static BlockingQueue<Mail> getReceiveMailQueue (Mail m) {
          return getMailQueue (m.getReceiver());
          }

          public static BlockingQueue<Mail> getMailQueue (MailAccount e) {
          mailQueues.putIfAbsent (e,new MailQueue<Mail>());
          MailQueue<Mail> mailQ =mailQueues.get(e);

          return mailQ;
          }
          }
          需要注意的是,我們在MailQueueFactory里面使用了ConcurrentHashMap,而不是傳統的Hashtable, 雖然Hashtable是thread-safe,但是缺乏putIfAbsent這樣的 原子函數,如果不小心設計的話,會造成對同一個MailQueue重復初始化,從而導致死鎖問題。 下面看Mail的定義
          public class Mail implements Comparable{
          public final static int emergencyMail =0;
          public final static int normalMail =1;

          static AtomicInteger serialCounter =new AtomicInteger(0);

          private int mailLevel;
          private int serialNumber =serialCounter.addAndGet(1);
          private MailAccount receiver =null;
          private MailAccount sender =null;
          private Date sendTime =new Date();

          public Mail (String from, String to, int level) {
          ...
          }

          //Get functions
          ...

          public int compareTo(Object o) {
          if (o instanceof Mail) {
          return compareTo ((Mail)o);
          }
          return 0;
          }

          public int compareTo (Mail o) {
          if (o.mailLevel==this.mailLevel) { //Same level, compare the serial no
          if (o.serialNumber==this.serialNumber)
          return 0;
          if (o.serialNumber>this.serialNumber)
          return -1;
          return 1;
          }
          if (this.mailLevel==emergencyMail) return -1;
          return 1;
          }
          //Other functions
          ...
          }
          這里值得注意的是AtomicInteger的使用,它被用來做內部serialNumber的產生。另外就是compareTo函數的使用,PriorityBlockingQueue使用Comparable接口來判定元素的優先級別。這里所定義的優先級如下: *如果郵件類別相同,則序列號小的郵件有較大的優先級 *如果郵件類別不同,則emergencyMail有較大的優先級 最后是Deliver Agent 和 Transfer Agent的代碼
          public class MailDeliverer {
          MailAccount mailAccount =null;

          public MailDeliverer (MailAccount account) {
          this.mailAccount =account;
          }

          public MailAccount getMailAccount() {
          return mailAccount;
          }

          public Mail retrieveMail () {
          Mail mail =null;
          while (mail==null) {
          try {
          mail =MailQueueFactory.getMailQueue(this).take();
          }catch (Exception e) {
          Utils._log("Encounter Exception",e);
          }
          }
          return mail;
          }
          }

          public class MailTransfer {
          private static MailTransfer instance =new MailTransfer ();
          private MailTransfer () { }

          public static MailTransfer getInstance () {
          return instance;
          }

          public void processMail (Mail m) {
          BlockingQueue mailQ =MailQueueFactory.getReceiveMailQueue(m);
          try {
          mailQ.put(m);
          } catch (InterruptedException e) {
          e.printStackTrace();
          }
          }
          }


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 富民县| 涞源县| 鄂伦春自治旗| 曲阜市| 大厂| 乌兰察布市| 永宁县| 柏乡县| 永康市| 赣州市| 青阳县| 依安县| 静安区| 南丰县| 东海县| 和平区| 隆子县| 昌乐县| 墨江| 精河县| 隆安县| 东明县| 桦甸市| 织金县| 福建省| 双流县| 图片| 玉山县| 仁布县| 元朗区| 南城县| 兴宁市| 子长县| 松阳县| 会同县| 遂宁市| 自贡市| 无棣县| 涡阳县| 鄂尔多斯市| 广河县|