Linux線程編程之生產者消費者問題
前言
本文基于順序循環隊列,給出Linux生產者/消費者問題的多線程示例,并討論編程時需要注意的事項。文中涉及的代碼運行環境如下:
本文假定讀者已具備線程同步的基礎知識。
一 順序表循環隊列
1.1 順序循環隊列定義
隊列是一種運算受限的先進先出線性表,僅允許在隊尾插入(入隊),在隊首刪除(出隊)。新元素入隊后成為新的隊尾元素,元素出隊后其后繼元素就成為隊首元素。
隊列的順序存儲結構使用一個數組和兩個整型變量實現,其結構如下:
1 struct Queue{
2 ElemType elem[MaxSize];
3 int head, tail;
4 };
即利用數組elem順序存儲隊列中的所有元素,利用整型變量head和tail分別存儲隊首元素和隊尾(或隊尾下一個)元素的下標位置,分別稱為隊首指針和隊尾指針。MaxSize指定順序存儲隊列的最大長度,即隊列中最多能夠存儲的元素個數。當隊列的順序存儲空間靜態分配時,MaxSize通常為宏定義;若動態分配時,還可為全局或局部變量。
單向順序隊列存在“假溢出”問題,即多次入隊和出隊操作后,隊首空余許多位置而隊尾指針指向隊列中最后一個元素位置,從而無法使新的數據元素入隊。假溢出本質在于隊首和隊尾指針值不能由所定義數組下界值自動轉為數組上界值,解決辦法是將順序隊列構造成一個邏輯上首尾相連的循環表。當隊列的第MaxSize-1個位置被占用后,只要隊首還有可用空間,則把新的數據元素插入隊列第0號位置。因此,順序隊列通常都采用順序循環隊列結構。
下圖所示為MaxSize=4的循環隊列三種狀態圖:
其中,為區分隊空與隊滿,在入隊時少用一個數據(圖中下標為3的位置)。本文約定隊首指針head指向隊首元素,隊尾指針tail指向隊尾元素的下一個元素,以隊尾指針加1等于隊首指針判斷隊滿,即:
初始化:head = tail = 0;
隊列空:head == tail;
隊列滿:(tail + 1) % MaxSize == head;
元素數:num = (tail - head + MaxSize) % MaxSize
%為取模運算符,循環隊列利用數學上的取模運算將首尾巧妙相連。
當約定head指向隊首前一個元素,tail指向隊尾元素時,元素數目仍滿足上面的公式。當約定head指向隊首元素,tail指向隊尾元素時,元素數目為(tail - head + 1 + MaxSize) % MaxSize。
此外,也可另設一個標志以區別隊空和隊滿。該標志可為布爾型空滿標志位,也可為隊列中元素個數。
通過隊首元素位置和隊列中元素個數,可計算出隊尾元素所在位置。亦即,可用隊列中元素個數代替隊尾指針(或隊首指針)。此時,隊列滿足:
隊列空:num == 0;
隊列滿:num == MaxSize;
隊尾位置:tail = (head + num + MaxSize) % MaxSize
1.2 順序循環隊列實現
本節將采用C語言實現一個簡單但卻標準的順序循環隊列函數集。
首先定義循環隊列結構如下:
1 #define QUEUE_SIZE 5 //隊列最大容納QUEUE_SIZE-1個元素
2 typedef struct{
3 int aData[QUEUE_SIZE]; //隊列元素
4 int dwHead; //指向隊首元素
5 int dwTail; //指向隊尾元素的下一個元素
6 }T_QUEUE, *PT_QUEUE;
為求簡單性,元素類型假定為整型。
基于上述結構,定義初始化、入隊出隊和顯式函數:
1 //初始化循環隊列 2 void InitQue(PT_QUEUE ptQue) 3 { 4 memset(ptQue, 0, sizeof(*ptQue)); 5 } 6 7 //向循環隊列中插入元素 8 void EnterQue(PT_QUEUE ptQue, int dwElem) 9 { 10 if(IsQueFull(ptQue)) 11 { 12 printf("Elem %d cannot enter Queue %p(Full)!\n", dwElem, ptQue); 13 return; 14 } 15 ptQue->aData[ptQue->dwTail]= dwElem; 16 ptQue->dwTail = (ptQue->dwTail + 1) % QUEUE_SIZE; 17 } 18 19 //從循環隊列中取出元素 20 int LeaveQue(PT_QUEUE ptQue) 21 { 22 if(IsQueEmpty(ptQue)) 23 { 24 printf("Queue %p is Empty!\n", ptQue); 25 return -1; 26 } 27 int dwElem = ptQue->aData[ptQue->dwHead]; 28 ptQue->dwHead = (ptQue->dwHead + 1) % QUEUE_SIZE; 29 return dwElem; 30 } 31 32 //從隊首至隊尾依次顯示隊中元素值 33 void DisplayQue(PT_QUEUE ptQue) 34 { 35 if(IsQueEmpty(ptQue)) 36 { 37 printf("Queue %p is Empty!\n", ptQue); 38 return; 39 } 40 41 printf("Queue Element: "); 42 int dwIdx = ptQue->dwHead; 43 while((dwIdx % QUEUE_SIZE) != ptQue->dwTail) 44 printf("%d ", ptQue->aData[(dwIdx++) % QUEUE_SIZE]); 45 46 printf("\n"); 47 } |
然后提供判空、判滿、查詢等輔助函數:
1 //判斷循環隊列是否為空 2 int IsQueEmpty(PT_QUEUE ptQue) 3 { 4 return ptQue->dwHead == ptQue->dwTail; 5 } 6 7 //判斷循環隊列是否為滿 8 int IsQueFull(PT_QUEUE ptQue) 9 { 10 return (ptQue->dwTail + 1) % QUEUE_SIZE == ptQue->dwHead; 11 } 12 13 //獲取循環隊列元素數目 14 int QueDataNum(PT_QUEUE ptQue) 15 { 16 return (ptQue->dwTail - ptQue->dwHead + QUEUE_SIZE) % QUEUE_SIZE; 17 } 18 19 //獲取循環隊列隊首位置 20 int GetQueHead(PT_QUEUE ptQue) 21 { 22 return ptQue->dwHead; 23 } 24 //獲取循環隊列隊首元素 25 int GetQueHeadData(PT_QUEUE ptQue) 26 { 27 return ptQue->aData[ptQue->dwHead]; 28 } 29 //獲取循環隊列隊尾位置 30 int GetQueTail(PT_QUEUE ptQue) 31 { 32 return ptQue->dwTail; 33 } |
最后,通過QueueTest()函數來測試隊列函數集:
1 static T_QUEUE gtQueue; 2 void QueueTest(void) 3 { 4 InitQue(>Queue); 5 printf("Enter Queue 1,2,3,4,5...\n"); 6 EnterQue(>Queue, 1); 7 EnterQue(>Queue, 2); 8 EnterQue(>Queue, 3); 9 EnterQue(>Queue, 4); 10 EnterQue(>Queue, 5); 11 printf("Queue Status: Empty(%d), Full(%d)\n", IsQueEmpty(>Queue), IsQueFull(>Queue)); 12 printf("Queue Elem Num: %u\n", QueDataNum(>Queue)); 13 printf("Head: %u(%d)\n", GetQueHead(>Queue), GetQueHeadData(>Queue)); 14 printf("Tail: %u\n", GetQueTail(>Queue)); 15 DisplayQue(>Queue); 16 17 printf("\nLeave Queue...\n"); 18 printf("Leave %d\n", LeaveQue(>Queue)); 19 printf("Leave %d\n", LeaveQue(>Queue)); 20 printf("Leave %d\n", LeaveQue(>Queue)); 21 DisplayQue(>Queue); 22 23 printf("\nEnter Queue 6,7...\n"); 24 EnterQue(>Queue, 6); 25 EnterQue(>Queue, 7); 26 DisplayQue(>Queue); 27 28 printf("\nLeave Queue...\n"); 29 printf("Leave %d\n", LeaveQue(>Queue)); 30 printf("Leave %d\n", LeaveQue(>Queue)); 31 printf("Leave %d\n", LeaveQue(>Queue)); 32 DisplayQue(>Queue); 33 } |
編譯鏈接后,運行結果如下:
1 Enter Queue 1,2,3,4,5... 2 Elem 5 cannot enter Queue 0x8053f9c(Full)! 3 Queue Status: Empty(0), Full(1) 4 Queue Elem Num: 4 5 Head: 0(1) 6 Tail: 4 7 Queue Element: 1 2 3 4 8 9 Leave Queue... 10 Leave 1 11 Leave 2 12 Leave 3 13 Queue Element: 4 14 15 Enter Queue 6,7... 16 Queue Element: 4 6 7 17 18 Leave Queue... 19 Leave 4 20 Leave 6 21 Leave 7 22 Queue 0x8053f9c is Empty! |
可見,隊列行為完全符合期望。
二 生產者消費者問題
2.1 問題概述
本節將討論Linux并發編程中經典的生產者/消費者(producer-consumer)問題。該問題涉及一個大小限定的有界緩沖區(bounded buffer)和兩類線程或進程(生產者和消費者)。
在緩沖區中有可用空間時,一個或一組生產者(線程或進程)將創建的產品(數據條目)放入緩沖區,然后由一個或一組消費者(線程或進程)提取這些產品。產品在生產者和消費者之間通過某種類型的IPC傳遞。
在多個進程間共享一個公共數據緩沖區需要某種形式的共享內存區(如存儲映射或共享內存),而多線程天然地共享存儲空間。為簡便起見,本節的討論僅限于多線程。
多個生產者和消費者線程的場景如下圖所示:
在生產者/消費者問題中,生產者線程必須在緩沖區中有可用空間后才能向其中放置內容,否則將阻塞(進入休眠狀態)直到出現下一個可用的空位置。生產者線程可使用互斥量原子性地檢查緩沖區,而不受其他線程干擾。當發現緩沖區已滿后,生產者阻塞自己并在緩沖區變為非滿時被喚醒,這可由條件變量實現。
消費者線程必須在生產者向緩沖區中寫入之后才能從中提取內容。同理,可用互斥量和條件變量以無競爭的方式等待緩沖區由空變為非空。
2.2 多線程實現
本節將采用隊列模擬任務,給出生產者/消費者問題的多線程示例。
為簡單起見,生產者將隊列元素下標作為元素值入隊,消費者使元素出隊并驗證元素值正確性。
首先定義一組全局數據:
1 #define PRODUCER_NUM 5 //生產者數
2 #define CONSUMER_NUM 3 //消費者數
3 #define PRD_NUM 20 //最多生產的產品數
4 #define DELAY_TIME 3 //生產(或消費)任務之間的最大時間間隔
5
6 #define QUEUE_SIZE (PRD_NUM+1) //隊列最大容納QUEUE_SIZE-1個元素
7
8 T_QUEUE gtQueue;
9 pthread_mutex_t gtQueLock = PTHREAD_MUTEX_INITIALIZER;
10 pthread_cond_t gtPrdCond = PTHREAD_COND_INITIALIZER; //Full->Not Full
11 pthread_cond_t gtCsmCond = PTHREAD_COND_INITIALIZER; //Empty->Not Empty
此處,QUEUE_SIZE按照產品數重新定義。互斥量gtQueLock用于保護全局隊列gtQueue和兩個條件變量。條件變量gtPrdCond用于隊列由滿變為非滿時通知(喚醒)生產者線程,而gtCsmCond用于隊列由空變為非空時通知消費者線程。
若首先創建并啟動生產者線程,再立即或稍候創建消費者線程,則不需要條件變量gtCsmCond(隊列中始終有產品)。本節為全面展現線程間的同步,約定消費者線程創建和啟動完畢之后,再創建生產者線程。這樣,所有消費者線程將會阻塞,在條件變量gtCsmCond的線程列表里條件狀態的改變。生產者線程并開始“產出”后,廣播通知所有消費者線程。反之,因為消費者線程不會全部阻塞,可單播喚醒某個消費者。
初始化隊列和創建線程的主函數如下:
1 int main(void) 2 { 3 InitQue(>Queue); 4 srand(getpid()); //初始化隨機函數發生器 5 6 pthread_t aThrd[CONSUMER_NUM+PRODUCER_NUM]; 7 int dwThrdIdx; 8 for(dwThrdIdx = 0; dwThrdIdx < CONSUMER_NUM; dwThrdIdx++) 9 { //創建CONSUMER_NUM個消費者線程,傳入(void*)dwThrdIdx作為ConsumerThread的參數 10 pthread_create(&aThrd[dwThrdIdx], NULL, ConsumerThread, (void*)dwThrdIdx); 11 } 12 sleep(2); 13 for(dwThrdIdx = 0; dwThrdIdx < PRODUCER_NUM; dwThrdIdx++) 14 { 15 pthread_create(&aThrd[dwThrdIdx], NULL, ProducerThread, (void*)dwThrdIdx); 16 } 17 while(1); 18 return 0 ; 19 } |
生產者線程啟動例程ProducerThread()實現如下:
1 void *ProducerThread(void *pvArg) 2 { 3 pthread_detach(pthread_self()); 4 int dwThrdNo = (int)pvArg; 5 while(1) 6 { 7 pthread_mutex_lock(>QueLock); 8 while(IsQueFull(>Queue)) //隊列由滿變為非滿時,生產者線程被喚醒 9 pthread_cond_wait(>PrdCond, >QueLock); 10 11 EnterQue(>Queue, GetQueTail(>Queue)); //將隊列元素下標作為元素值入隊 12 if(QueDataNum(>Queue) == 1) //當生產者開始產出后,通知(喚醒)消費者線程 13 pthread_cond_broadcast(>CsmCond); 14 printf("[Producer %2u]Current Product Num: %u\n", dwThrdNo, QueDataNum(>Queue)); 15 16 pthread_mutex_unlock(>QueLock); 17 sleep(rand()%DELAY_TIME + 1); 18 } 19 } 隊列變滿時,生產者線程進入休眠狀態。消費者線程取出產品,將隊列由滿變為非滿時,生產者線程再次被喚醒。 消費者線程啟動例程ConsumerThread()實現如下: 1 void *ConsumerThread(void *pvArg) 2 { 3 pthread_detach(pthread_self()); 4 int dwThrdNo = (int)pvArg; 5 while(1) 6 { 7 pthread_mutex_lock(>QueLock); 8 while(IsQueEmpty(>Queue)) //隊列由空變為非空時,消費者線程將被喚醒 9 pthread_cond_wait(>CsmCond, >QueLock); 10 11 if(GetQueHead(>Queue) != GetQueHeadData(>Queue)) 12 { 13 printf("[Consumer %2u]Product: %d, Expect: %d\n", dwThrdNo, 14 GetQueHead(>Queue), GetQueHeadData(>Queue)); 15 exit(0); 16 } 17 LeaveQue(>Queue); 18 if(QueDataNum(>Queue) == (PRD_NUM-1)) //當隊列由滿變為非滿時,通知(喚醒)生產者線程 19 pthread_cond_broadcast(>PrdCond); 20 printf("[Consumer %2u]Current Product Num: %u\n", dwThrdNo, QueDataNum(>Queue)); 21 22 pthread_mutex_unlock(>QueLock); 23 sleep(rand()%DELAY_TIME + 1); 24 } 25 } |
當隊列元素值不符合期望時,打印出錯提示并調用exit()終止進程。
編譯鏈接后,截取部分運行結果如下(因每次執行時線程延時隨機值,故執行順序可能不同):
[wangxiaoyuan_@localhost Thread]$ gcc -Wall -o procon procon.c -pthread
[wangxiaoyuan_@localhost Thread]$ ./procon
[Producer 4]Current Product Num: 1
[Consumer 1]Current Product Num: 0
[Producer 3]Current Product Num: 1
[Consumer 0]Current Product Num: 0
[Producer 2]Current Product Num: 1
[Consumer 2]Current Product Num: 0
[Producer 1]Current Product Num: 1
[Producer 0]Current Product Num: 2
//... ...
[Consumer 0]Current Product Num: 17
[Producer 3]Current Product Num: 18
[Producer 1]Current Product Num: 19
[Consumer 2]Current Product Num: 18
[Producer 4]Current Product Num: 19
[Producer 2]Current Product Num: 20
[Consumer 1]Current Product Num: 19
[Consumer 2]Current Product Num: 18
[Producer 0]Current Product Num: 19
[Producer 4]Current Product Num: 20
//Ctrl + C
2.3 注意事項
本節基于生產者/消費者問題的多線程實現,討論一些編程注意事項。為簡化書寫,省略線程相關函數名的前綴"pthread_"。
1. 互斥量實際上保護的是臨界區中被多個線程或進程共享的數據(shared data)。此外,互斥量是協作性(cooperative)鎖,無法禁止繞過這種機制的訪問。例如,若共享數據為一個鏈表,則操作該鏈表的所有線程都應該在實際操作前獲取該互斥量(但無法防止某個線程不首先獲取該互斥量就操作鏈表)。
對本文實現而言,互斥量gtQueLock保護全局隊列gtQueue。但因為各生產者(或消費者)線程啟動例程相同,內部對該隊列的操作邏輯和順序相同,故gtQueLock看起來在對隊列函數加鎖。進而,在生產者與消費者線程之間,均在加鎖期間操作隊列,因此對隊列函數加鎖近似等效于對隊列數據加鎖。但仍需認識到,這種協作對于頻繁調用的基礎性函數(如庫函數)而言并不現實。當某些調用未經加鎖直接操作時,對于其他使用互斥量的調用而言,互斥保護就失去意義。本文也可考慮在隊列函數集內加鎖(互斥量或讀寫鎖),但因為互斥量gtQueLock同時還保護條件變量,隊列內再加鎖就稍顯復雜,而且線程內互斥量同名時容易產生死鎖。
因此,設計時最好約定所有線程遵守相同的數據訪問規則。
2. 應盡量減少由一個互斥量鎖住的代碼量,以增強并發性。
例如,生產者線程啟動例程ProducerThread()中,對pvArg進行賦值操作時并不需要互斥量保護(不屬于臨界區)。再者,若各生產者線程之間與生產者/消費者線程之間共享不同的數據,則可使用兩個互斥量,但復雜度也隨之上升。
多線程軟件設計經常要考慮加鎖粒度的折中。若使用粗粒度鎖定(coarse-grained locking),即長期持有鎖定以便盡可能降低獲取和釋放鎖的開銷,則可能有很多線程阻塞等待相同的鎖,從而降低并發性;若使用細粒度鎖定(fine-grained locking),即僅在必要時鎖定并在不必要時盡快解鎖以便盡可能提高并發性,則較多的鎖開銷可能會降低系統性能,而且代碼變得相當復雜。因此,設計時應劃分適當數目的鎖來保護數據,以在代碼復雜性和性能優化之間找好平衡點。
3. 條件本身是由互斥量保護的。當線程調用cond_wait()函數基于條件變量阻塞之前,應先鎖定互斥量以避免線程間的競爭和饑餓情況(否則將導致未定義的行為)。
調用cond_wait()時,線程將鎖定的互斥量傳遞給該函數,對條件進行保護。該函數將調用線程放到等待條件的線程列表上,然后對互斥量解鎖。這兩個操作是原子性的,即關閉了條件檢查和線程進入休眠狀態等待條件改變這兩個操作之間的時間通道,這樣線程就不會錯過條件的任何變化。該函數對互斥量解鎖后,其它線程可以獲得加鎖的權利,并訪問和修改臨界資源(條件狀態)。一旦其它某個線程改變了條件使其為真,該線程將通知相應的條件變量喚醒一個或多個正被該條件變量阻塞的線程。cond_wait()返回時,互斥量再次被鎖定并被調用線程擁有,即使返回錯誤時也是如此。
可見,cond_wait()函數解鎖和阻塞調用線程的“原子性”針對其他線程訪問互斥量和條件變量而言。若線程B在即將阻塞的線程A解鎖之后獲得互斥量,則線程隨后B調用cond_ signal/broadcast()函數通告條件狀態變化時,該信號必然在線程A阻塞之后發出(否則將遺漏該信號)。
使用多個互斥量保護基于相同條件變量的cond_wait()調用時,其效果未定義。即當線程基于條件變量阻塞時,該條件變量綁定到唯一的互斥量上,且這種(動態)綁定將在cond_wait()調用返回時結束。
4. 函數cond_signal()喚醒被阻塞在條件變量上的至少一個線程。在多處理器上,該函數在不同處理器上同時單播信號時可能喚醒多個線程。當多個線程阻塞在條件變量上時,哪個或哪些線程被喚醒由線程的調度策略(scheduling policy)所決定。
cond_broadcast()會喚醒阻塞在條件變量上的所有線程。這些線程被喚醒后將再次競爭相應的互斥量。喚醒多個線程的典型場景是讀出者與寫入者問題。當一個寫入者完成訪問并釋放相應的鎖后,它希望喚醒所有正在排隊的讀出者,因為同時允許多個讀出者訪問。
若已確定只有一個等待者需要喚醒,且喚醒哪個等待者無關緊要,則可使用單播發送;所有其他情況下都應使用廣播發送(更為安全)。
若當前沒有任何線程基于條件變量阻塞,則調用cond_signal/broadcast()不起作用。
5. 虛假喚醒(spurious wakeup)指沒有線程明確調用cond_signal/broadcast()時,cond_wait()偶爾也會返回;或者條件狀態尚不滿足時就調用cond_signal/broadcast()。此時,線程雖被喚醒但條件并不成立,若不再次檢查條件而往下執行,很可能導致后續的處理出現錯誤。因此,當從cond_wait()返回時,線程應重新測試條件成立與否。該過程一般用while循環實現,即:
1 pthread_mutex_lock();
2 while(IsConditionFalse)
3 pthread_cond_wait();
4 //Do something
5 pthread_mutex_unlock();
使用while循環不僅能避免虛假喚醒造成的錯誤,還能避免喚醒線程間競爭導致的“驚群效應”。
例如,ProducerThread()內,調用cond_wait()對互斥量自動解鎖后,在允許消費者線程修改條件的同時,也允許其他生產者線程調用該函數依次阻塞。當這些線程被喚醒時(如隊列由滿變為非滿),會再次競爭相應的互斥量。獲得互斥量的那個線程進入臨界區處理,這可能改變測試條件(如產出一件使得隊列再次變滿)。該線程釋放互斥量后,其他某個處于等待狀態的線程獲得該互斥量時,雖然cond_wait()成功返回但很可能條件已不成立。因此,調用cond_wait()成功返回時,線程需要重新測試條件是否滿足。
調用cond_timedwait()的情況與之類似。由于等待超時與條件改變之間存在無法避免的競爭,當該函數返回超時錯誤時條件仍可能成立。
6. 若線程未持有與條件相關聯的互斥量,則調用cond_signal/broadcast()可能會產生喚醒丟失問題。
當滿足以下所有條件時,即會出現喚醒丟失問題:
1) 一個線程調用cond_signal/broadcast()函數;
2) 另一個線程已測試該條件,但尚未調用cond_wait()函數;
3) 沒有正處于阻塞等待狀態的線程。
只要僅在持有關聯互斥量的同時修改所測試的條件,即可調用cond_signal/broadcast(),而無論這些函數是否持有關聯的互斥量。
可見,調用cond_signal/broadcast()時若無線程等待條件變量,則該信號將被丟失。因此,發送信號前最好檢查下是否有正在等待的線程。例如,可維護一個等待線程計數器,在觸發條件變量前檢查該計數器。本文線程代碼內對隊列元素數目的檢查與之類似。
對于等待條件的線程而言,若錯失信號(如啟動過遲),則會一直阻塞到其它線程再次發送信號到該條件變量。
7. ProducerThread()中,cond_broadcast()函數由當前鎖住互斥量的生產者線程調用,本函數將發送信號給該互斥量所關聯的條件變量。當該條件變量被發送信號后,系統立即調度等待在其上的消費者線程;該線程開始運行后試圖獲取互斥量,但該互斥量仍由生產者線程所持有。因此被喚醒的消費者線程被迫進入休眠狀態,直至生產者線程釋放互斥量后再次被喚醒。這種不必要的上下文切換可能會嚴重影響性能。
為避免這種加鎖沖突(以提高效率),可將判斷隊列元素數目的語句值賦給一個局部變量bHasOnePrd,直到釋放互斥量gtQueLock后才判斷bHasOnePrd并向與之關聯的條件變量gtCsmCond發送信號。
Posix標準規定,調用cond_signal/broadcast()的線程不必是與之關聯的互斥量的當前擁有者,即允許在釋放互斥量后才給與之關聯的條件變量發送信號。若程序不關心線程可預知的調度行為,最好在鎖定區域以外調用cond_signal/broadcast()。
當然,本文所示的生產者和消費者線程中,即使不判斷隊列元素數目而直接發送信號也可以(通過修改隊列指針間接地改變條件狀態,雖然會發送一些無效信號)。
8. 當只有一個生產者和一個消費者時,通過謹慎操作隊列可避免線程間的原子性同步。例如,生產者線程僅更新隊尾指針,消費者線程僅更新隊首指針。簡化的示例如下:
1 #define QUEUE_SIZE 20 2 volatile unsigned int gdwPrdNum = 0, gdwCsmNum = 0; 3 int gQueue[QUEUE_SIZE] = {0}; 4 5 void *Producer(void *pvArg) { 6 while(1) { 7 while(gdwPrdNum - gdwCsmNum == QUEUE_SIZE) 8 ; //Full 9 10 gQueue[gdwPrdNum % QUEUE_SIZE]++; 11 gdwPrdNum++; 12 } 13 pthread_exit(0); 14 } 15 16 void *Consumer(void *pvArg) { 17 while(1) { 18 while(gdwPrdNum - gdwCsmNum == 0) 19 ; //Empty 20 21 gQueue[gdwCsmNum % QUEUE_SIZE]--; 22 gdwCsmNum++; 23 } 24 pthread_exit(0); 25 } |
該例中使用輪詢(polling)方式,可在不依賴互斥量和條件變量的情況下高效地共享數據。判空和判滿的循環保證兩個線程不可能同時操作同一個隊列元素。當線程被取消時,可觀察到隊列中元素值為全零(若無循環則為隨機值)。雖然輪詢方式通常比較低效,但在多處理器環境中線程休眠和喚醒的頻率較小,故繞開數據原子性操作是有利的。
另一則實例則可參考《守護進程接收終端輸入的一種變通性方法》一文。
9. 使用Posix信號量可模擬互斥量和條件變量,而且通常更有優勢。
當函數sem_wait()和sem_post()用于線程內時,兩個調用間的區域就是所要保護的臨界區代碼;當用于線程間時,則與條件變量等效。
此外,信號量還可用作資源計數器,即初始化信號量的值作為某個資源當前可用的數量,使用時遞減釋放時遞增。這樣,原先一些保存隊列狀態的變量都不再需要。
最后,內核會記錄信號的存在,不會將信號丟失;而喚醒條件變量時若沒有線程在等待該條件變量,信號將被丟失。
posted on 2014-10-30 12:04 順其自然EVO 閱讀(544) 評論(0) 編輯 收藏 所屬分類: 測試學習專欄 、linux