下面先對(duì)condition_impl進(jìn)行簡(jiǎn)要分析。
condition_impl在其構(gòu)造函數(shù)中會(huì)創(chuàng)建兩個(gè)Semaphore(信號(hào)量):m_gate、m_queue,及一個(gè)Mutex(互斥體,跟boost::mutex類(lèi)似,但boost::mutex是基于CriticalSection<臨界區(qū)>的):m_mutex,其中:
m_queue
相當(dāng)于當(dāng)前所有等待線程的等待隊(duì)列,構(gòu)造函數(shù)中調(diào)用CreateSemaphore來(lái)創(chuàng)建Semaphore時(shí),lMaximumCount參數(shù)被指定為(std::numeric_limits<long>::max)(),即便如此,condition的實(shí)現(xiàn)者為了防止出現(xiàn)大量等待線程的情況(以至于超過(guò)了long的最大值),在線程因執(zhí)行condition::wait進(jìn)入等待狀態(tài)時(shí)會(huì)先:
WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE);
以等待被喚醒,但很難想象什么樣的應(yīng)用需要處理這么多線程。
m_mutex
用于內(nèi)部同步的控制。
但對(duì)于m_gate我很奇怪,我仔細(xì)研究了一下condition_imp的實(shí)現(xiàn),還是不明白作者引入m_gate這個(gè)變量的用意何在,既然已經(jīng)有了用于同步控制的m_mutex,再引入一個(gè)m_gate實(shí)在讓我有點(diǎn)不解。
以下是condition::wait調(diào)用的do_wait方法簡(jiǎn)化后的代碼:
雖然condition的內(nèi)部實(shí)現(xiàn)比較復(fù)雜,但使用起來(lái)還是比較方便的。下面是一個(gè)使用condition的多Producer-多Consumer同步的例子:
condition_impl在其構(gòu)造函數(shù)中會(huì)創(chuàng)建兩個(gè)Semaphore(信號(hào)量):m_gate、m_queue,及一個(gè)Mutex(互斥體,跟boost::mutex類(lèi)似,但boost::mutex是基于CriticalSection<臨界區(qū)>的):m_mutex,其中:
m_queue
相當(dāng)于當(dāng)前所有等待線程的等待隊(duì)列,構(gòu)造函數(shù)中調(diào)用CreateSemaphore來(lái)創(chuàng)建Semaphore時(shí),lMaximumCount參數(shù)被指定為(std::numeric_limits<long>::max)(),即便如此,condition的實(shí)現(xiàn)者為了防止出現(xiàn)大量等待線程的情況(以至于超過(guò)了long的最大值),在線程因執(zhí)行condition::wait進(jìn)入等待狀態(tài)時(shí)會(huì)先:
WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE);
以等待被喚醒,但很難想象什么樣的應(yīng)用需要處理這么多線程。
m_mutex
用于內(nèi)部同步的控制。
但對(duì)于m_gate我很奇怪,我仔細(xì)研究了一下condition_imp的實(shí)現(xiàn),還是不明白作者引入m_gate這個(gè)變量的用意何在,既然已經(jīng)有了用于同步控制的m_mutex,再引入一個(gè)m_gate實(shí)在讓我有點(diǎn)不解。
以下是condition::wait調(diào)用的do_wait方法簡(jiǎn)化后的代碼:
1 template <typename M>
2 void do_wait(M& mutex)
3 {
4 m_impl.enter_wait();
5 lock_ops::unlock(mutex, state); //對(duì)傳入的scoped_lock對(duì)象解鎖,以便別的線程可以對(duì)其進(jìn)行加鎖,并執(zhí)行某些處理,否則,本線程等待的condition永遠(yuǎn)不會(huì)發(fā)生(因?yàn)闆](méi)有線程可以獲得訪問(wèn)資源的權(quán)利以使condition發(fā)生)
6 m_impl.do_wait(); //執(zhí)行等待操作,等待其它線程執(zhí)行notify_one或notify_all操作以獲得
7 lock_ops::lock(mutex, state); //重新對(duì)scoped_lock對(duì)象加鎖,獲得獨(dú)占訪問(wèn)資源的權(quán)利
8 }
condition::timed_wait的實(shí)現(xiàn)方法與此類(lèi)似,而notify_one、notify_all僅將調(diào)用請(qǐng)求轉(zhuǎn)發(fā)給m_impl,就不多講了。2 void do_wait(M& mutex)
3 {
4 m_impl.enter_wait();
5 lock_ops::unlock(mutex, state); //對(duì)傳入的scoped_lock對(duì)象解鎖,以便別的線程可以對(duì)其進(jìn)行加鎖,并執(zhí)行某些處理,否則,本線程等待的condition永遠(yuǎn)不會(huì)發(fā)生(因?yàn)闆](méi)有線程可以獲得訪問(wèn)資源的權(quán)利以使condition發(fā)生)
6 m_impl.do_wait(); //執(zhí)行等待操作,等待其它線程執(zhí)行notify_one或notify_all操作以獲得
7 lock_ops::lock(mutex, state); //重新對(duì)scoped_lock對(duì)象加鎖,獲得獨(dú)占訪問(wèn)資源的權(quán)利
8 }
雖然condition的內(nèi)部實(shí)現(xiàn)比較復(fù)雜,但使用起來(lái)還是比較方便的。下面是一個(gè)使用condition的多Producer-多Consumer同步的例子:
1 #include <boost/thread/thread.hpp>
2 #include <boost/thread/mutex.hpp>
3 #include <boost/thread/condition.hpp>
4 #include <boost/thread/xtime.hpp>
5
6 #include <iostream>
7 #include <time.h> // for time()
8
9 #include <Windows.h> // for Sleep, change it for other platform, we can use
10 // boost::thread::sleep, but it's too inconvenient.
11
12 typedef boost::mutex::scoped_lock scoped_lock;
13 boost::mutex io_mutex;
14
15 class Product
16 {
17 int num;
18 public:
19 Product(int num) : num(num) {}
20
21 friend std::ostream& operator<< (std::ostream& os, Product& product)
22 {
23 return os << product.num;
24 }
25 };
26
27 class Mediator
28 {
29 private:
30 boost::condition cond;
31 boost::mutex mutex;
32
33 Product** pSlot; // product buffer/slot
34 unsigned int slotCount, // buffer size
35 productCount; // current product count
36 bool stopFlag; // should all thread stop or not
37
38 public:
39 Mediator(const int slotCount) : slotCount(slotCount), stopFlag(false), productCount(0)
40 {
41 pSlot = new Product*[slotCount];
42 }
43
44 virtual ~Mediator()
45 {
46 for (int i = 0; i < static_cast<int>(productCount); i++)
47 {
48 delete pSlot[i];
49 }
50 delete [] pSlot;
51 }
52
53 bool Stop() const { return stopFlag; }
54 void Stop(bool) { stopFlag = true; }
55
56 void NotifyAll() // notify all blocked thread to exit
57 {
58 cond.notify_all();
59 }
60
61 bool Put( Product* pProduct)
62 {
63 scoped_lock lock(mutex);
64 if (productCount == slotCount)
65 {
66 {
67 scoped_lock lock(io_mutex);
68 std::cout << "Buffer is full. Waiting
" << std::endl;
69 }
70 while (!stopFlag && (productCount == slotCount))
71 cond.wait(lock);
72 }
73 if (stopFlag) // it may be notified by main thread to quit.
74 return false;
75
76 pSlot[ productCount++ ] = pProduct;
77 cond.notify_one(); // this call may cause *pProduct to be changed if it wakes up a consumer
78
79 return true;
80 }
81
82 bool Get(Product** ppProduct)
83 {
84 scoped_lock lock(mutex);
85 if (productCount == 0)
86 {
87 {
88 scoped_lock lock(io_mutex);
89 std::cout << "Buffer is empty. Waiting
" << std::endl;
90 }
91 while (!stopFlag && (productCount == 0))
92 cond.wait(lock);
93 }
94 if (stopFlag) // it may be notified by main thread to quit.
95 {
96 *ppProduct = NULL;
97 return false;
98 }
99
100 *ppProduct = pSlot[--productCount];
101 cond.notify_one();
102
103 return true;
104 }
105 };
106
107 class Producer
108 {
109 private:
110 Mediator* pMediator;
111 static unsigned int num;
112 unsigned int id; // Producer id
113
114 public:
115 Producer(Mediator* pMediator) : pMediator(pMediator) { id = num++; }
116
117 void operator() ()
118 {
119 Product* pProduct;
120 srand( (unsigned)time( NULL ) + id ); // each thread need to srand differently
121 while (!pMediator->Stop())
122 {
123 pProduct = new Product( rand() % 100 );
124 // must print product info before call Put, as Put may wake up a consumer
125 // and cause *pProuct to be changed
126 {
127 scoped_lock lock(io_mutex);
128 std::cout << "Producer[" << id << "] produces Product["
129 << *pProduct << "]" << std::endl;
130 }
131 if (!pMediator->Put(pProduct)) // this function only fails when it is notified by main thread to exit
132 delete pProduct;
133
134 Sleep(100);
135 }
136 }
137 };
138
139 unsigned int Producer::num = 1;
140
141 class Consumer
142 {
143 private:
144 Mediator* pMediator;
145 static unsigned int num;
146 unsigned int id; // Consumer id
147
148 public:
149 Consumer(Mediator* pMediator) : pMediator(pMediator) { id = num++; }
150
151 void operator() ()
152 {
153 Product* pProduct = NULL;
154 while (!pMediator->Stop())
155 {
156 if (pMediator->Get(&pProduct))
157 {
158 scoped_lock lock(io_mutex);
159 std::cout << "Consumer[" << id << "] is consuming Product["
160 << *pProduct << "]" << std::endl;
161 delete pProduct;
162 }
163
164 Sleep(100);
165 }
166 }
167 };
168
169 unsigned int Consumer::num = 1;
170
171 int main()
172 {
173 Mediator mediator(2); // we have only 2 slot to put products
174
175 // we have 2 producers
176 Producer producer1(&mediator);
177 boost::thread thrd1(producer1);
178 Producer producer2(&mediator);
179 boost::thread thrd2(producer2);
180 // and we have 3 consumers
181 Consumer consumer1(&mediator);
182 boost::thread thrd3(consumer1);
183 Consumer consumer2(&mediator);
184 boost::thread thrd4(consumer2);
185 Consumer consumer3(&mediator);
186 boost::thread thrd5(consumer3);
187
188 // wait 1 second
189 Sleep(1000);
190 // and then try to stop all threads
191 mediator.Stop(true);
192 mediator.NotifyAll();
193
194 // wait for all threads to exit
195 thrd1.join();
196 thrd2.join();
197 thrd3.join();
198 thrd4.join();
199 thrd5.join();
200
201 return 0;
202 }
2 #include <boost/thread/mutex.hpp>
3 #include <boost/thread/condition.hpp>
4 #include <boost/thread/xtime.hpp>
5
6 #include <iostream>
7 #include <time.h> // for time()
8
9 #include <Windows.h> // for Sleep, change it for other platform, we can use
10 // boost::thread::sleep, but it's too inconvenient.
11
12 typedef boost::mutex::scoped_lock scoped_lock;
13 boost::mutex io_mutex;
14
15 class Product
16 {
17 int num;
18 public:
19 Product(int num) : num(num) {}
20
21 friend std::ostream& operator<< (std::ostream& os, Product& product)
22 {
23 return os << product.num;
24 }
25 };
26
27 class Mediator
28 {
29 private:
30 boost::condition cond;
31 boost::mutex mutex;
32
33 Product** pSlot; // product buffer/slot
34 unsigned int slotCount, // buffer size
35 productCount; // current product count
36 bool stopFlag; // should all thread stop or not
37
38 public:
39 Mediator(const int slotCount) : slotCount(slotCount), stopFlag(false), productCount(0)
40 {
41 pSlot = new Product*[slotCount];
42 }
43
44 virtual ~Mediator()
45 {
46 for (int i = 0; i < static_cast<int>(productCount); i++)
47 {
48 delete pSlot[i];
49 }
50 delete [] pSlot;
51 }
52
53 bool Stop() const { return stopFlag; }
54 void Stop(bool) { stopFlag = true; }
55
56 void NotifyAll() // notify all blocked thread to exit
57 {
58 cond.notify_all();
59 }
60
61 bool Put( Product* pProduct)
62 {
63 scoped_lock lock(mutex);
64 if (productCount == slotCount)
65 {
66 {
67 scoped_lock lock(io_mutex);
68 std::cout << "Buffer is full. Waiting

69 }
70 while (!stopFlag && (productCount == slotCount))
71 cond.wait(lock);
72 }
73 if (stopFlag) // it may be notified by main thread to quit.
74 return false;
75
76 pSlot[ productCount++ ] = pProduct;
77 cond.notify_one(); // this call may cause *pProduct to be changed if it wakes up a consumer
78
79 return true;
80 }
81
82 bool Get(Product** ppProduct)
83 {
84 scoped_lock lock(mutex);
85 if (productCount == 0)
86 {
87 {
88 scoped_lock lock(io_mutex);
89 std::cout << "Buffer is empty. Waiting

90 }
91 while (!stopFlag && (productCount == 0))
92 cond.wait(lock);
93 }
94 if (stopFlag) // it may be notified by main thread to quit.
95 {
96 *ppProduct = NULL;
97 return false;
98 }
99
100 *ppProduct = pSlot[--productCount];
101 cond.notify_one();
102
103 return true;
104 }
105 };
106
107 class Producer
108 {
109 private:
110 Mediator* pMediator;
111 static unsigned int num;
112 unsigned int id; // Producer id
113
114 public:
115 Producer(Mediator* pMediator) : pMediator(pMediator) { id = num++; }
116
117 void operator() ()
118 {
119 Product* pProduct;
120 srand( (unsigned)time( NULL ) + id ); // each thread need to srand differently
121 while (!pMediator->Stop())
122 {
123 pProduct = new Product( rand() % 100 );
124 // must print product info before call Put, as Put may wake up a consumer
125 // and cause *pProuct to be changed
126 {
127 scoped_lock lock(io_mutex);
128 std::cout << "Producer[" << id << "] produces Product["
129 << *pProduct << "]" << std::endl;
130 }
131 if (!pMediator->Put(pProduct)) // this function only fails when it is notified by main thread to exit
132 delete pProduct;
133
134 Sleep(100);
135 }
136 }
137 };
138
139 unsigned int Producer::num = 1;
140
141 class Consumer
142 {
143 private:
144 Mediator* pMediator;
145 static unsigned int num;
146 unsigned int id; // Consumer id
147
148 public:
149 Consumer(Mediator* pMediator) : pMediator(pMediator) { id = num++; }
150
151 void operator() ()
152 {
153 Product* pProduct = NULL;
154 while (!pMediator->Stop())
155 {
156 if (pMediator->Get(&pProduct))
157 {
158 scoped_lock lock(io_mutex);
159 std::cout << "Consumer[" << id << "] is consuming Product["
160 << *pProduct << "]" << std::endl;
161 delete pProduct;
162 }
163
164 Sleep(100);
165 }
166 }
167 };
168
169 unsigned int Consumer::num = 1;
170
171 int main()
172 {
173 Mediator mediator(2); // we have only 2 slot to put products
174
175 // we have 2 producers
176 Producer producer1(&mediator);
177 boost::thread thrd1(producer1);
178 Producer producer2(&mediator);
179 boost::thread thrd2(producer2);
180 // and we have 3 consumers
181 Consumer consumer1(&mediator);
182 boost::thread thrd3(consumer1);
183 Consumer consumer2(&mediator);
184 boost::thread thrd4(consumer2);
185 Consumer consumer3(&mediator);
186 boost::thread thrd5(consumer3);
187
188 // wait 1 second
189 Sleep(1000);
190 // and then try to stop all threads
191 mediator.Stop(true);
192 mediator.NotifyAll();
193
194 // wait for all threads to exit
195 thrd1.join();
196 thrd2.join();
197 thrd3.join();
198 thrd4.join();
199 thrd5.join();
200
201 return 0;
202 }
for ( int i = 0; i < _slotCount; i++ ) _pSlot[i] = NULL;