(轉) 用JAVA 實現(xiàn)“生產(chǎn)者-消費者”問題

          生產(chǎn)者和消費者問題是從操作系統(tǒng)中的許多實際同步問題中抽象出來的具有
          代表性的問題。它反映了操作系統(tǒng)中典型的同步例子。

            生產(chǎn)者進程(進程由多個線程組成)生產(chǎn)信息,例如它可以是計算進程。消費
          者進程使用信息,它可以是輸出打印進程。由于生產(chǎn)者和消費者彼此獨立,且運
          行速度不確定,所以很可能出現(xiàn)生產(chǎn)者已產(chǎn)生了信息而消費者卻沒有來得及接受
          信息這種情況。為此,需要引入由一個或者若干個存儲單元組成的臨時存儲區(qū),
          以便存放生產(chǎn)者所產(chǎn)生的信息,平滑進程間由于速度不確定所帶來的問題。這個
          臨時存儲區(qū)叫做緩沖區(qū),通常用一維數(shù)組來表示。

            由一個或若干個存儲單元組成的緩沖區(qū)叫作“有窮緩沖區(qū)”。下面我們來分
          析一下有窮緩沖的生產(chǎn)者和消費者的例子。

            假設有多個生產(chǎn)者和多個消費者,它們共享一個具有n個存儲單元的有窮緩沖
          區(qū)Buffer(0……n-1),這是一個環(huán)形隊列。其隊尾指針Rear指向當前信息應存放
          的位置(Buffer[Rear]),隊首指針Front指向當前取出信息的位置(Buffer[front
          ])。生產(chǎn)者進程總是把信息存放在Buffer[Rear]中,消費者進程則總是從Buffer
          [Rear]中取出信息。如果想使生產(chǎn)者進程和消費者進程協(xié)調(diào)合作,則必須使它們
          遵循如下規(guī)則:

            1) 只要緩沖區(qū)有存儲單元,生產(chǎn)者都可往其中存放信息;當緩沖區(qū)已滿時,
          若任意生產(chǎn)者提出寫要求,則都必須等待;

            2) 只要緩沖區(qū)中有消息可取,消費者都可從緩沖區(qū)中取出消息;當緩沖區(qū)為
          空時,若任意消費者想取出信息,則必須等待;

            3) 生產(chǎn)者們和消費者們不能同時讀、寫緩沖區(qū)。

            用JAVA 實現(xiàn)“生產(chǎn)者-消費者”問題的代碼如下:

          public class ProducerConsumer {
           public static void main(String[] args) {
            SyncStack ss = new SyncStack();
            Producer p = new Producer(ss);
            Consumer c = new Consumer(ss);
            new Thread(p).start();
            new Thread(p).start();
            new Thread(p).start();
            new Thread(c).start();
           }
          }

          class WoTou {
           int id;
           WoTou(int id) {
            this.id = id;
           }
           public String toString() {
            return "WoTou : " + id;
           }
          }

          class SyncStack {
           int index = 0;
           WoTou[] arrWT = new WoTou[6];
           
           public synchronized void push(WoTou wt) {
            while(index == arrWT.length) {
             try {
              this.wait();
             } catch (InterruptedException e) {
              e.printStackTrace();
             }
            }
            this.notifyAll(); 
            arrWT[index] = wt;
            index ++;
           }
           
           public synchronized WoTou pop() {
            while(index == 0) {
             try {
              this.wait();
             } catch (InterruptedException e) {
              e.printStackTrace();
             }
            }
            this.notifyAll();
            index--;
            return arrWT[index];
           }
          }

          class Producer implements Runnable {
           SyncStack ss = null;
           Producer(SyncStack ss) {
            this.ss = ss;
           }
           
           public void run() {
            for(int i=0; i<20; i++) {
             WoTou wt = new WoTou(i);
             ss.push(wt);
          System.out.println("生產(chǎn)了:" + wt);
             try {
              Thread.sleep((int)(Math.random() * 200));
             } catch (InterruptedException e) {
              e.printStackTrace();
             }  
            }
           }
          }

          class Consumer implements Runnable {
           SyncStack ss = null;
           Consumer(SyncStack ss) {
            this.ss = ss;
           }
           
           public void run() {
            for(int i=0; i<20; i++) {
             WoTou wt = ss.pop();
          System.out.println("消費了: " + wt);
             try {
              Thread.sleep((int)(Math.random() * 1000));
             } catch (InterruptedException e) {
              e.printStackTrace();
             }  
            }
           }
          }

          生產(chǎn)者消費者問題是研究多線程程序時繞不開的問題,它的描述是有一塊生產(chǎn)者和消費者共享的有界緩沖區(qū),生產(chǎn)者往緩沖區(qū)放入產(chǎn)品,消費者從緩沖區(qū)取走產(chǎn)品,這個過程可以無休止的執(zhí)行,不能因緩沖區(qū)滿生產(chǎn)者放不進產(chǎn)品而終止,也不能因緩沖區(qū)空消費者無產(chǎn)品可取而終止。

                 解決生產(chǎn)者消費者問題的方法有兩種,一種是采用某種機制保持生產(chǎn)者和消費者之間的同步,一種是在生產(chǎn)者和消費者之間建立一個管道。前一種有較高的效率并且可控制性較好,比較常用,后一種由于管道緩沖區(qū)不易控制及被傳輸數(shù)據(jù)對象不易封裝等原因,比較少用。
                 同步問題的核心在于,CPU是按時間片輪詢的方式執(zhí)行程序,我們無法知道某一個線程是否被執(zhí)行、是否被搶占、是否結束等,因此生產(chǎn)者完全可能當緩沖區(qū)已滿的時候還在放入產(chǎn)品,消費者也完全可能當緩沖區(qū)為空時還在取出產(chǎn)品。
                 現(xiàn)在同步問題的解決方法一般是采用信號或者加鎖機制,即生產(chǎn)者線程當緩沖區(qū)已滿時放棄自己的執(zhí)行權,進入等待狀態(tài),并通知消費者線程執(zhí)行。消費者線程當緩沖區(qū)已空時放棄自己的執(zhí)行權,進入等待狀態(tài),并通知生產(chǎn)者線程執(zhí)行。這樣一來就保持了線程的同步,并避免了線程間互相等待而進入死鎖狀態(tài)。
                 JAVA語言提供了獨立于平臺的線程機制,保持了”write once, run anywhere”的特色。同時也提供了對同步機制的良好支持。
                 在JAVA中,一共有四種方法支持同步,其中三個是同步方法,一個是管道方法。
          1.       方法wait()/notify()
          2.       方法await()/signal()
          3.       阻塞隊列方法BlockingQueue
          4.       管道方法PipedInputStream/PipedOutputStream
          下面我們看各個方法的實現(xiàn):
          1.       方法wait()/notify()
          wait()和notify()是根類Object的兩個方法,也就意味著所有的JAVA類都會具有這個兩個方法,為什么會被這樣設計呢?我們可以認為所有的對象默認都具有一個鎖,雖然我們看不到,也沒有辦法直接操作,但它是存在的。
          wait()方法表示:當緩沖區(qū)已滿或空時,生產(chǎn)者或消費者線程停止自己的執(zhí)行,放棄鎖,使自己處于等待狀態(tài),讓另一個線程開始執(zhí)行;
          notify()方法表示:當生產(chǎn)者或消費者對緩沖區(qū)放入或取出一個產(chǎn)品時,向另一個線程發(fā)出可執(zhí)行通知,同時放棄鎖,使自己處于等待狀態(tài)。
          下面是一個例子代碼:
          import java.util.LinkedList;

          public class Sycn1...{
              private LinkedList<Object> myList =new LinkedList<Object>();
              private int MAX = 10;
             
              public Sycn1()...{
              }
             
              public void start()...{
                      new Producer().start();
                      new Consumer().start();
              }
             
              public static void main(String[] args) throws Exception...{
                  Sycn1 s1 = new Sycn1();
                  s1.start();
              }
             
              class Producer extends Thread...{       
                  public void run()...{
                      while(true)...{
                          synchronized(myList)...{
                              try...{
                                  while(myList.size() == MAX)...{
                                      System.out.println("warning: it's full!");
                                      myList.wait();
                                  }
                                  Object o = new Object();
                                  if(myList.add(o))...{
                                      System.out.println("Producer: " + o);
                                      myList.notify();
                                  }
                              }catch(InterruptedException ie)...{
                                  System.out.println("producer is interrupted!");
                              }
                          }
                      }
                  }
              }
             
              class Consumer extends Thread...{
                  public void run()...{
                      while(true)...{
                          synchronized(myList)...{
                              try...{
                                  while(myList.size() == 0)...{
                                      System.out.println("warning: it's empty!");
                                      myList.wait();
                                  }
                                  Object o = myList.removeLast();
                                  System.out.println("Consumer: " + o);
                                  myList.notify();
                              }catch(InterruptedException ie)...{
                                  System.out.println("consumer is interrupted!");
                              }
                          }
                      }
                  }
              }
             
          }
          2.       方法await()/signal()
          在JDK5.0以后,JAVA提供了新的更加健壯的線程處理機制,包括了同步、鎖定、線程池等等,它們可以實現(xiàn)更小粒度上的控制。await()和signal()就是其中用來做同步的兩種方法,它們的功能基本上和wait()/notify()相同,完全可以取代它們,但是它們和新引入的鎖定機制Lock直接掛鉤,具有更大的靈活性。
          下面是一個例子代碼:
          import java.util.LinkedList;

          import java.util.concurrent.locks.*;

          public class Sycn2...{
              private LinkedList<Object> myList = new LinkedList<Object>();
              private int MAX = 10;
              private final Lock lock = new ReentrantLock();
              private final Condition full = lock.newCondition();
              private final Condition empty = lock.newCondition();
             
              public Sycn2()...{
              }
             
              public void start()...{
                      new Producer().start();
                      new Consumer().start();
              }
             
              public static void main(String[] args) throws Exception...{
                  Sycn2 s2 = new Sycn2();
                  s2.start();
              }
             
              class Producer extends Thread...{       
                  public void run()...{
                      while(true)...{
                          lock.lock();
                          try...{
                              while(myList.size() == MAX)...{
                                  System.out.println("warning: it's full!");
                                  full.await();
                              }
                              Object o = new Object();
                              if(myList.add(o))...{
                                  System.out.println("Producer: " + o);
                                  empty.signal();
                              }
                          }catch(InterruptedException ie)...{
                              System.out.println("producer is interrupted!");
                          }finally...{
                              lock.unlock();
                          }
                      }
                  }
              }
             
              class Consumer extends Thread...{
                  public void run()...{
                      while(true)...{
                          lock.lock();
                          try...{
                              while(myList.size() == 0)...{
                                  System.out.println("warning: it's empty!");
                                  empty.await();
                              }
                              Object o = myList.removeLast();
                              System.out.println("Consumer: " + o);
                              full.signal();
                          }catch(InterruptedException ie)...{
                              System.out.println("consumer is interrupted!");
                          }finally...{
                              lock.unlock();
                          }
                      }
                  }
              }
             
          }
          3.       阻塞隊列方法BlockingQueue
          BlockingQueue也是JDK5.0的一部分,它是一個已經(jīng)在內(nèi)部實現(xiàn)了同步的隊列,實現(xiàn)方式采用的是我們的第2種await()/signal()方法。它可以在生成對象時指定容量大小。
          它用于阻塞操作的是put()和take()方法。
          put()方法類似于我們上面的生產(chǎn)者線程,容量最大時,自動阻塞。
          take()方法類似于我們上面的消費者線程,容量為0時,自動阻塞。
          下面是一個例子代碼:
          import java.util.concurrent.*;

          public class Sycn3...{
              private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(10);
              private int MAX = 10;
             
              public Sycn3()...{
              }
             
              public void start()...{
                      new Producer().start();
                      new Consumer().start();
              }
             
              public static void main(String[] args) throws Exception...{
                  Sycn3 s3 = new Sycn3();
                  s3.start();
              }
             
              class Producer extends Thread...{       
                  public void run()...{
                      while(true)...{
                          //synchronized(this){
                          try...{
                              if(queue.size() == MAX)
                                  System.out.println("warning: it's full!");
                              Object o = new Object();
                              queue.put(o);
                              System.out.println("Producer: " + o);
                              }catch(InterruptedException e)...{
                                  System.out.println("producer is interrupted!");
                              }
                          //}
                      }
                  }
              }
             
              class Consumer extends Thread...{
                  public void run()...{
                      while(true)...{
                          //synchronized(this){
                          try...{
                              if(queue.size() == 0)
                                  System.out.println("warning: it's empty!");
                              Object o = queue.take();
                              System.out.println("Consumer: " + o);
                              }catch(InterruptedException e)...{
                                  System.out.println("producer is interrupted!");
                              }
                          //}
                      }
                  }
              }
             
          }
          你發(fā)現(xiàn)這個例子中的問題了嗎?
          如果沒有,我建議你運行一下這段代碼,仔細觀察它的輸出,是不是有下面這個樣子的?為什么會這樣呢?

          warning: it's full!
          Producer: java.lang.object@4526e2a

          你可能會說這是因為put()和System.out.println()之間沒有同步造成的,我也這樣認為,我也這樣認為,但是你把run()中的synchronized前面的注釋去掉,重新編譯運行,有改觀嗎?沒有。為什么?
          這是因為,當緩沖區(qū)已滿,生產(chǎn)者在put()操作時,put()內(nèi)部調(diào)用了await()方法,放棄了線程的執(zhí)行,然后消費者線程執(zhí)行,調(diào)用take()方法,take()內(nèi)部調(diào)用了signal()方法,通知生產(chǎn)者線程可以執(zhí)行,致使在消費者的println()還沒運行的情況下生產(chǎn)者的println()先被執(zhí)行,所以有了上面的輸出。run()中的synchronized其實并沒有起什么作用。
          對于BlockingQueue大家可以放心使用,這可不是它的問題,只是在它和別的對象之間的同步有問題。
          對于這種多重嵌套同步的問題,以后再談吧,歡迎大家討論啊!
          4.       管道方法PipedInputStream/PipedOutputStream
          這個類位于java.io包中,是解決同步問題的最簡單的辦法,一個線程將數(shù)據(jù)寫入管道,另一個線程從管道讀取數(shù)據(jù),這樣便構成了一種生產(chǎn)者/消費者的緩沖區(qū)編程模式。
          下面是一個例子代碼,在這個代碼我沒有使用Object對象,而是簡單的讀寫字節(jié)值,這是因為PipedInputStream/PipedOutputStream不允許傳輸對象,這是JAVA本身的一個bug,具體的大家可以看sun的解釋:http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4131126
          import java.io.*;

          public class Sycn4...{
              private PipedOutputStream pos;
              private PipedInputStream pis;
              //private ObjectOutputStream oos;
              //private ObjectInputStream ois;
             
              public Sycn4()...{
                  try...{
                      pos = new PipedOutputStream();
                      pis = new PipedInputStream(pos);
                      //oos = new ObjectOutputStream(pos);
                      //ois = new ObjectInputStream(pis);
                  }catch(IOException e)...{
                      System.out.println(e);
                  }
              }
             
              public void start()...{
                  new Producer().start();
                  new Consumer().start();
              }
             
              public static void main(String[] args) throws Exception...{
                  Sycn4 s4 = new Sycn4();
                  s4.start();
              }
             
              class Producer extends Thread...{
                  public void run() ...{
                      try...{
                          while(true)...{
                              int b = (int) (Math.random() * 255);
                              System.out.println("Producer: a byte, the value is " + b);
                              pos.write(b);
                              pos.flush();
                              //Object o = new MyObject();
                              //oos.writeObject(o);
                              //oos.flush();
                              //System.out.println("Producer: " + o);
                          }
                      }catch(Exception e)...{
                          //System.out.println(e);
                          e.printStackTrace();
                      }finally...{
                          try...{
                              pos.close();
                              pis.close();
                              //oos.close();
                              //ois.close();
                          }catch(IOException e)...{
                              System.out.println(e);
                          }
                      }
                  }
              }
             
              class Consumer extends Thread...{
                  public void run()...{
                      try...{
                          while(true)...{
                              int b = pis.read();
                              System.out.println("Consumer: a byte, the value is " + String.valueOf(b));
                              //Object o = ois.readObject();
                              //if(o != null)
                                  //System.out.println("Consumer: " + o);
                          }
                      }catch(Exception e)...{
                          //System.out.println(e);
                          e.printStackTrace();
                      }finally...{
                          try...{
                              pos.close();
                              pis.close();
                              //oos.close();
                              //ois.close();
                          }catch(IOException e)...{
                              System.out.println(e);
                          }
                      }
                  }
              }
             
              //class MyObject implements Serializable {
              //}
          }

          出處:http://blog.csdn.net/JaunLee/archive/2008/02/01/2077291.aspx

          posted on 2010-05-16 19:03 liujg 閱讀(694) 評論(0)  編輯  收藏 所屬分類: Java基礎

          <2025年6月>
          25262728293031
          1234567
          891011121314
          15161718192021
          22232425262728
          293012345

          導航

          統(tǒng)計

          常用鏈接

          留言簿(1)

          隨筆分類

          隨筆檔案

          文章分類

          文章檔案

          相冊

          收藏夾

          boddiy

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 泾川县| 呼和浩特市| 黑龙江省| 浮梁县| 安顺市| 自治县| 门源| 兴城市| 陇川县| 呼玛县| 富宁县| 鹰潭市| 甘南县| 海口市| 肃宁县| 沙坪坝区| 彰武县| 永新县| 湛江市| 保德县| 松阳县| 岳池县| 墨脱县| 宜君县| 鹤庆县| 玛纳斯县| SHOW| 克什克腾旗| 石景山区| 荃湾区| 乐山市| 韶关市| 凤城市| 维西| 定安县| 手游| 昌宁县| 天台县| 贡觉县| 镇江市| 石屏县|