生產(chǎn)者消費者問題理解與Java實現(xiàn)
生產(chǎn)者消費者問題(英語:Producer-consumer problem),也稱有限緩沖問題(英語:Bounded-buffer problem),是一個多線程同步問題的經(jīng)典案例。該問題描述了兩個共享固定大小緩沖區(qū)的線程——即所謂的“生產(chǎn)者”和“消費者”——在實際運行時會發(fā)生的問題。生產(chǎn)者的主要作用是生成一定量的數(shù)據(jù)放到緩沖區(qū)中,然后重復此過程。與此同時,消費者也在緩沖區(qū)消耗這些數(shù)據(jù)。該問題的關(guān)鍵就是要保證生產(chǎn)者不會在緩沖區(qū)滿時加入數(shù)據(jù),消費者也不會在緩沖區(qū)中空時消耗數(shù)據(jù)。
要解決該問題,就必須讓生產(chǎn)者在緩沖區(qū)滿時休眠(要么干脆就放棄數(shù)據(jù)),等到下次消費者消耗緩沖區(qū)中的數(shù)據(jù)的時候,生產(chǎn)者才能被喚醒,開始往緩沖區(qū)添加數(shù)據(jù)。同樣,也可以讓消費者在緩沖區(qū)空時進入休眠,等到生產(chǎn)者往緩沖區(qū)添加數(shù)據(jù)之后,再喚醒消費者。通常采用進程間通信的方法解決該問題,常用的方法有信號燈法等。如果解決方法不夠完善,則容易出現(xiàn)死鎖的情況。出現(xiàn)死鎖時,兩個線程都會陷入休眠,等待對方喚醒自己。該問題也能被推廣到多個生產(chǎn)者和消費者的情形。
Java代碼模擬生產(chǎn)者-消費者
產(chǎn)品類
package org.dennist.thread.demo; /** * * Product.java * * @version : 1.1 * * @author : 蘇若年 <a href="mailto:DennisIT@163.com">發(fā)送郵件</a> * * @since : 1.0 創(chuàng)建時間: 2013-2-25 上午09:03:38 * * TODO : class Product.java is used for ... * */ public class Product { //產(chǎn)品類 private int productId = 0; public Product(int productId){ this.productId = productId; } public int getProductId() { return productId; } public void setProductId(int productId) { this.productId = productId; } @Override public String toString() { return ""+productId; } } |
倉庫類
package org.dennist.thread.demo; /** * * StoreHouse.java * * @version : 1.1 * * @author : 蘇若年 <a href="mailto:DennisIT@163.com">發(fā)送郵件</a> * * @since : 1.0 創(chuàng)建時間: 2013-2-25 上午08:55:33 * * TODO : 倉庫 * */ public class StoreHouse { private int base = 0; private int top = 0; //倉庫大小 private Product[] products = new Product[10]; /** * 生產(chǎn)產(chǎn)品 * @param product */ public synchronized void push(Product product){ if(top==products.length){ //如果倉庫已滿,等待消費 try { System.out.println("倉庫已滿,正在等待消費.."); wait(); }catch (InterruptedException e) { System.out.println("stop push product because other reasons"); } } //倉庫未滿,將生產(chǎn)的產(chǎn)品入庫 products[top] = product; //庫中產(chǎn)品數(shù)量+1 top++; } /** * 消費產(chǎn)品 * @return */ public synchronized Product pop() { Product product = null; while (top == base) { //倉庫未空,不能消費 notify(); try { System.out.println("倉庫已空,正等待生產(chǎn)..."); wait(); } catch (InterruptedException e) { System.out.println("stop push product because other reasons"); } } //倉庫未空,等待消費 top--; product = products[top]; products[top] = null; return product; } } |
生產(chǎn)者類
package org.dennist.thread.demo; /** * * Producer.java * * @version : 1.1 * * @author : 蘇若年 <a href="mailto:DennisIT@163.com">發(fā)送郵件</a> * * @since : 1.0 創(chuàng)建時間: 2013-2-25 上午08:53:16 * * TODO : 生產(chǎn)者 * */ public class Producer implements Runnable{ private String producerName ; private StoreHouse storeHouse ; public Producer(String producerName, StoreHouse storeHouse) { this.producerName = producerName; this.storeHouse = storeHouse; } public void setProducerName(String producerName) { this.producerName = producerName; } public String getProducerName() { return producerName; } @Override public void run() { execProcuct(); } private void execProcuct() { int i = 0; while(true){ i++; Product pro = new Product(i); storeHouse.push(pro); System.out.println(getProducerName() + " 生產(chǎn)了 " + pro); try { Thread.sleep(2000); } catch (InterruptedException e) { return; } } } } 消費者類 |
package org.dennist.thread.demo; /** * * Consumer.java * * @version : 1.1 * * @author : 蘇若年 <a href="mailto:DennisIT@163.com">發(fā)送郵件</a> * * @since : 1.0 創(chuàng)建時間: 2013-2-25 上午08:53:47 * * TODO : 消費者 * */ public class Consumer implements Runnable{ private String consumerName = null; private StoreHouse storeHouse = null; public Consumer(String consumerName, StoreHouse storeHouse) { this.consumerName = consumerName; this.storeHouse = storeHouse; } public void setConsumerName(String consumerName) { this.consumerName = consumerName; } public String getConsumerName() { return consumerName; } public void execConsume() { while (true) { System.out.println(getConsumerName() + " 消費了 " + storeHouse.pop()); try { Thread.sleep(5000); } catch (InterruptedException e) { return; } } } @Override public void run() { execConsume(); } } |
測試主類
package org.dennist.thread.demo; /** * * TestPC.java * * @version : 1.1 * * @author : 蘇若年 <a href="mailto:DennisIT@163.com">發(fā)送郵件</a> * * @since : 1.0 創(chuàng)建時間: 2013-2-25 上午09:18:52 * * TODO : 生產(chǎn)者消費者模擬 * */ public class TestPC { public static void main(String[] args) { StoreHouse storeHouse = new StoreHouse(); Producer producer = new Producer("生產(chǎn)者", storeHouse); Consumer comsumer = new Consumer("消費者", storeHouse); Thread t1 = new Thread(producer); Thread t2 = new Thread(comsumer); t1.start(); t2.start(); } } |
關(guān)于JAVA多線程同步
JAVA多線程同步主要依賴于若干方法和關(guān)鍵字
1 wait方法:
該方法屬于Object的方法,wait方法的作用是使得當前調(diào)用wait方法所在部分(代碼塊)的線程停止執(zhí)行,并釋放當前獲得的調(diào)用wait所在的代碼塊的鎖,并在其他線程調(diào)用notify或者notifyAll方法時恢復到競爭鎖狀態(tài)(一旦獲得鎖就恢復執(zhí)行)。
調(diào)用wait方法需要注意幾點:
第一點:wait被調(diào)用的時候必須在擁有鎖(即synchronized修飾的)的代碼塊中。
第二點:恢復執(zhí)行后,從wait的下一條語句開始執(zhí)行,因而wait方法總是應當在while循環(huán)中調(diào)用,以免出現(xiàn)恢復執(zhí)行后繼續(xù)執(zhí)行的條件不滿足卻繼續(xù)執(zhí)行的情況。
第三點:若wait方法參數(shù)中帶時間,則除了notify和notifyAll被調(diào)用能激活處于wait狀態(tài)(等待狀態(tài))的線程進入鎖競爭外,在其他線程中interrupt它或者參數(shù)時間到了之后,該線程也將被激活到競爭狀態(tài)。
第四點:wait方法被調(diào)用的線程必須獲得之前執(zhí)行到wait時釋放掉的鎖重新獲得才能夠恢復執(zhí)行。
2 notify方法和notifyAll方法:
notify方法通知調(diào)用了wait方法,但是尚未激活的一個線程進入線程調(diào)度隊列(即進入鎖競爭),注意不是立即執(zhí)行。并且具體是哪一個線程不能保證。另外一點就是被喚醒的這個線程一定是在等待wait所釋放的鎖。
notifyAll方法則喚醒所有調(diào)用了wait方法,尚未激活的進程進入競爭隊列。
3 synchronized關(guān)鍵字:
第一點:synchronized用來標識一個普通方法時,表示一個線程要執(zhí)行該方法,必須取得該方法所在的對象的鎖。
第二點:synchronized用來標識一個靜態(tài)方法時,表示一個線程要執(zhí)行該方法,必須獲得該方法所在的類的類鎖。
第三點:synchronized修飾一個代碼塊。類似這樣:synchronized(obj) { //code.... }。表示一個線程要執(zhí)行該代碼塊,必須獲得obj的鎖。這樣做的目的是減小鎖的粒度,保證當不同塊所需的鎖不沖突時不用對整個對象加鎖。利用零長度的byte數(shù)組對象做obj非常經(jīng)濟。
4 atomic action(原子操作):
在JAVA中,以下兩點操作是原子操作。但是c和c++中并不如此。
第一點:對引用變量和除了long和double之外的原始數(shù)據(jù)類型變量進行讀寫。
第二點:對所有聲明為volatile的變量(包括long和double)的讀寫。
另外:在java.util.concurrent和java.util.concurrent.atomic包中提供了一些不依賴于同步機制的線程安全的類和方法。
附錄:
進程間通信
進程間通信(IPC,Inter-Process Communication),指至少兩個進程或線程間傳送數(shù)據(jù)或信號的一些技術(shù)或方法。線程是計算機系統(tǒng)分配資源的最小單位。每個進程都有自己的一部分獨立的系統(tǒng)資源,彼此是隔離的。為了能使不同的進程互相訪問資源并進行協(xié)調(diào)工作,才有了進程間通信。這些進程可以運行在同一計算機上或網(wǎng)絡連接的不同計算機上。
進程間通信技術(shù)包括消息傳遞、同步、共享內(nèi)存和遠程過程調(diào)用。IPC是一種標準的Unix通信機制。
主要的IPC方法有
(1)管道(Pipe):管道可用于具有親緣關(guān)系進程間的通信,允許一個進程和另一個與它有共同祖先的進程之間進行通信。
(2)命名管道(named pipe):命名管道克服了管道沒有名字的限制,因此,除具有管道所具有的功能外,它還允許無親緣關(guān)系進程間的通信。命名管道在文件系統(tǒng)中有對應的文件名。命名管道通過命令mkfifo或系統(tǒng)調(diào)用mkfifo來創(chuàng)建。
(3)信號(Signal):信號是比較復雜的通信方式,用于通知接受進程有某種事件發(fā)生,除了用于進程間通信外,進程還可以發(fā)送信號給進程本身;linux除了支持Unix早期信號語義函數(shù)sigal外,還支持語義符合Posix.1標準的信號函數(shù)sigaction(實際上,該函數(shù)是基于BSD的,BSD為了實現(xiàn)可靠信號機制,又能夠統(tǒng)一對外接口,用sigaction函數(shù)重新實現(xiàn)了signal函數(shù))。
(4)消息(Message)隊列:消息隊列是消息的鏈接表,包括Posix消息隊列system V消息隊列。有足夠權(quán)限的進程可以向隊列中添加消息,被賦予讀權(quán)限的進程則可以讀走隊列中的消息。消息隊列克服了信號承載信息量少,管道只能承載無格式字節(jié)流以及緩沖區(qū)大小受限等缺
(5)共享內(nèi)存:使得多個進程可以訪問同一塊內(nèi)存空間,是最快的可用IPC形式。是針對其他通信機制運行效率較低而設(shè)計的。往往與其它通信機制,如信號量結(jié)合使用,來達到進程間的同步及互斥。
(6)內(nèi)存映射(mapped memory):內(nèi)存映射允許任何多個進程間通信,每一個使用該機制的進程通過把一個共享的文件映射到自己的進程地址空間來實現(xiàn)它。
(7)信號量(semaphore):主要作為進程間以及同一進程不同線程之間的同步手段。
(8)套接口(Socket):更為一般的進程間通信機制,可用于不同機器之間的進程間通信。起初是由Unix系統(tǒng)的BSD分支開發(fā)出來的,但現(xiàn)在一般可以移植到其它類Unix系統(tǒng)上:Linux和System V的變種都支持套接字。