生產者消費者問題理解與Java實現
生產者消費者問題(英語:Producer-consumer problem),也稱有限緩沖問題(英語:Bounded-buffer problem),是一個多線程同步問題的經典案例。該問題描述了兩個共享固定大小緩沖區的線程——即所謂的“生產者”和“消費者”——在實際運行時會發生的問題。生產者的主要作用是生成一定量的數據放到緩沖區中,然后重復此過程。與此同時,消費者也在緩沖區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩沖區滿時加入數據,消費者也不會在緩沖區中空時消耗數據。
要解決該問題,就必須讓生產者在緩沖區滿時休眠(要么干脆就放棄數據),等到下次消費者消耗緩沖區中的數據的時候,生產者才能被喚醒,開始往緩沖區添加數據。同樣,也可以讓消費者在緩沖區空時進入休眠,等到生產者往緩沖區添加數據之后,再喚醒消費者。通常采用進程間通信的方法解決該問題,常用的方法有信號燈法等。如果解決方法不夠完善,則容易出現死鎖的情況。出現死鎖時,兩個線程都會陷入休眠,等待對方喚醒自己。該問題也能被推廣到多個生產者和消費者的情形。
Java代碼模擬生產者-消費者
產品類
package org.dennist.thread.demo; /** * * Product.java * * @version : 1.1 * * @author : 蘇若年 <a href="mailto:DennisIT@163.com">發送郵件</a> * * @since : 1.0 創建時間: 2013-2-25 上午09:03:38 * * TODO : class Product.java is used for ... * */ public class Product { //產品類 private int productId = 0; public Product(int productId){ this.productId = productId; } public int getProductId() { return productId; } public void setProductId(int productId) { this.productId = productId; } @Override public String toString() { return ""+productId; } } |
倉庫類
package org.dennist.thread.demo; /** * * StoreHouse.java * * @version : 1.1 * * @author : 蘇若年 <a href="mailto:DennisIT@163.com">發送郵件</a> * * @since : 1.0 創建時間: 2013-2-25 上午08:55:33 * * TODO : 倉庫 * */ public class StoreHouse { private int base = 0; private int top = 0; //倉庫大小 private Product[] products = new Product[10]; /** * 生產產品 * @param product */ public synchronized void push(Product product){ if(top==products.length){ //如果倉庫已滿,等待消費 try { System.out.println("倉庫已滿,正在等待消費.."); wait(); }catch (InterruptedException e) { System.out.println("stop push product because other reasons"); } } //倉庫未滿,將生產的產品入庫 products[top] = product; //庫中產品數量+1 top++; } /** * 消費產品 * @return */ public synchronized Product pop() { Product product = null; while (top == base) { //倉庫未空,不能消費 notify(); try { System.out.println("倉庫已空,正等待生產..."); wait(); } catch (InterruptedException e) { System.out.println("stop push product because other reasons"); } } //倉庫未空,等待消費 top--; product = products[top]; products[top] = null; return product; } } |
生產者類
package org.dennist.thread.demo; /** * * Producer.java * * @version : 1.1 * * @author : 蘇若年 <a href="mailto:DennisIT@163.com">發送郵件</a> * * @since : 1.0 創建時間: 2013-2-25 上午08:53:16 * * TODO : 生產者 * */ public class Producer implements Runnable{ private String producerName ; private StoreHouse storeHouse ; public Producer(String producerName, StoreHouse storeHouse) { this.producerName = producerName; this.storeHouse = storeHouse; } public void setProducerName(String producerName) { this.producerName = producerName; } public String getProducerName() { return producerName; } @Override public void run() { execProcuct(); } private void execProcuct() { int i = 0; while(true){ i++; Product pro = new Product(i); storeHouse.push(pro); System.out.println(getProducerName() + " 生產了 " + pro); try { Thread.sleep(2000); } catch (InterruptedException e) { return; } } } } 消費者類 |
package org.dennist.thread.demo; /** * * Consumer.java * * @version : 1.1 * * @author : 蘇若年 <a href="mailto:DennisIT@163.com">發送郵件</a> * * @since : 1.0 創建時間: 2013-2-25 上午08:53:47 * * TODO : 消費者 * */ public class Consumer implements Runnable{ private String consumerName = null; private StoreHouse storeHouse = null; public Consumer(String consumerName, StoreHouse storeHouse) { this.consumerName = consumerName; this.storeHouse = storeHouse; } public void setConsumerName(String consumerName) { this.consumerName = consumerName; } public String getConsumerName() { return consumerName; } public void execConsume() { while (true) { System.out.println(getConsumerName() + " 消費了 " + storeHouse.pop()); try { Thread.sleep(5000); } catch (InterruptedException e) { return; } } } @Override public void run() { execConsume(); } } |
測試主類
package org.dennist.thread.demo; /** * * TestPC.java * * @version : 1.1 * * @author : 蘇若年 <a href="mailto:DennisIT@163.com">發送郵件</a> * * @since : 1.0 創建時間: 2013-2-25 上午09:18:52 * * TODO : 生產者消費者模擬 * */ public class TestPC { public static void main(String[] args) { StoreHouse storeHouse = new StoreHouse(); Producer producer = new Producer("生產者", storeHouse); Consumer comsumer = new Consumer("消費者", storeHouse); Thread t1 = new Thread(producer); Thread t2 = new Thread(comsumer); t1.start(); t2.start(); } } |
關于JAVA多線程同步
JAVA多線程同步主要依賴于若干方法和關鍵字
1 wait方法:
該方法屬于Object的方法,wait方法的作用是使得當前調用wait方法所在部分(代碼塊)的線程停止執行,并釋放當前獲得的調用wait所在的代碼塊的鎖,并在其他線程調用notify或者notifyAll方法時恢復到競爭鎖狀態(一旦獲得鎖就恢復執行)。
調用wait方法需要注意幾點:
第一點:wait被調用的時候必須在擁有鎖(即synchronized修飾的)的代碼塊中。
第二點:恢復執行后,從wait的下一條語句開始執行,因而wait方法總是應當在while循環中調用,以免出現恢復執行后繼續執行的條件不滿足卻繼續執行的情況。
第三點:若wait方法參數中帶時間,則除了notify和notifyAll被調用能激活處于wait狀態(等待狀態)的線程進入鎖競爭外,在其他線程中interrupt它或者參數時間到了之后,該線程也將被激活到競爭狀態。
第四點:wait方法被調用的線程必須獲得之前執行到wait時釋放掉的鎖重新獲得才能夠恢復執行。
2 notify方法和notifyAll方法:
notify方法通知調用了wait方法,但是尚未激活的一個線程進入線程調度隊列(即進入鎖競爭),注意不是立即執行。并且具體是哪一個線程不能保證。另外一點就是被喚醒的這個線程一定是在等待wait所釋放的鎖。
notifyAll方法則喚醒所有調用了wait方法,尚未激活的進程進入競爭隊列。
3 synchronized關鍵字:
第一點:synchronized用來標識一個普通方法時,表示一個線程要執行該方法,必須取得該方法所在的對象的鎖。
第二點:synchronized用來標識一個靜態方法時,表示一個線程要執行該方法,必須獲得該方法所在的類的類鎖。
第三點:synchronized修飾一個代碼塊。類似這樣:synchronized(obj) { //code.... }。表示一個線程要執行該代碼塊,必須獲得obj的鎖。這樣做的目的是減小鎖的粒度,保證當不同塊所需的鎖不沖突時不用對整個對象加鎖。利用零長度的byte數組對象做obj非常經濟。
4 atomic action(原子操作):
在JAVA中,以下兩點操作是原子操作。但是c和c++中并不如此。
第一點:對引用變量和除了long和double之外的原始數據類型變量進行讀寫。
第二點:對所有聲明為volatile的變量(包括long和double)的讀寫。
另外:在java.util.concurrent和java.util.concurrent.atomic包中提供了一些不依賴于同步機制的線程安全的類和方法。
附錄:
進程間通信
進程間通信(IPC,Inter-Process Communication),指至少兩個進程或線程間傳送數據或信號的一些技術或方法。線程是計算機系統分配資源的最小單位。每個進程都有自己的一部分獨立的系統資源,彼此是隔離的。為了能使不同的進程互相訪問資源并進行協調工作,才有了進程間通信。這些進程可以運行在同一計算機上或網絡連接的不同計算機上。
進程間通信技術包括消息傳遞、同步、共享內存和遠程過程調用。IPC是一種標準的Unix通信機制。
主要的IPC方法有
(1)管道(Pipe):管道可用于具有親緣關系進程間的通信,允許一個進程和另一個與它有共同祖先的進程之間進行通信。
(2)命名管道(named pipe):命名管道克服了管道沒有名字的限制,因此,除具有管道所具有的功能外,它還允許無親緣關系進程間的通信。命名管道在文件系統中有對應的文件名。命名管道通過命令mkfifo或系統調用mkfifo來創建。
?。?)信號(Signal):信號是比較復雜的通信方式,用于通知接受進程有某種事件發生,除了用于進程間通信外,進程還可以發送信號給進程本身;linux除了支持Unix早期信號語義函數sigal外,還支持語義符合Posix.1標準的信號函數sigaction(實際上,該函數是基于BSD的,BSD為了實現可靠信號機制,又能夠統一對外接口,用sigaction函數重新實現了signal函數)。
(4)消息(Message)隊列:消息隊列是消息的鏈接表,包括Posix消息隊列system V消息隊列。有足夠權限的進程可以向隊列中添加消息,被賦予讀權限的進程則可以讀走隊列中的消息。消息隊列克服了信號承載信息量少,管道只能承載無格式字節流以及緩沖區大小受限等缺
?。?)共享內存:使得多個進程可以訪問同一塊內存空間,是最快的可用IPC形式。是針對其他通信機制運行效率較低而設計的。往往與其它通信機制,如信號量結合使用,來達到進程間的同步及互斥。
?。?)內存映射(mapped memory):內存映射允許任何多個進程間通信,每一個使用該機制的進程通過把一個共享的文件映射到自己的進程地址空間來實現它。
?。?)信號量(semaphore):主要作為進程間以及同一進程不同線程之間的同步手段。
(8)套接口(Socket):更為一般的進程間通信機制,可用于不同機器之間的進程間通信。起初是由Unix系統的BSD分支開發出來的,但現在一般可以移植到其它類Unix系統上:Linux和System V的變種都支持套接字。