隨筆-124  評論-194  文章-0  trackbacks-0

          最近需要一個能根據請求數變化的線程池,JAVA有這樣的東西,可是C++下好像一般只是固定大小的線程池。所以就基于ACE寫了個,只做了初步測試。

          主要思想是:
          1. 重載ACE_Task,這相當于是個固定線程池,用一個信號量(ACE_Thread_Semaphore)來記數空閑線程數。
          2. 初始化時根據用戶的輸入,確定最少線程數minnum和最大線程數maxnum,當多個請求到來,并且無空閑線程(信號量用光),判斷總線程數小于maxnum,就開始強迫增加線程數。
          3. 當線程響應完一個請求(任務)后,如果當前任務隊列為空,且線程數大于minnum,就退出本線程。這里做了一個優化,就算滿足條件,線程也會在隊列上再等待10秒,防止線程池抖動帶來不必要的開銷。

          使用:
          重載這個類,重載service_func函數實現自己的任務處理。
          start_pool初始化線程池,之后,就可以用add_task向線程池添加任務。
          它會根據請求的數量自動控制池大小進行處理。
          已經在LINUX下測試通過。由于ACE是跨平臺的,所以這個實現也應該可以在WINDOWS下工作。

          編譯:
          帶THREAD_POOL_UNIT_TEST選項,則編譯出自測程序test
          gcc -g -Wall -O2 -g -Wall -I. -I../ -I../mon/comm/ACE_wrappers -g -DTHREAD_POOL_UNIT_TEST -o test thread_pool.cpp -lpthread -lm -lz -lstdc++ ../mon/comm/ACE_wrappers/ace/libACE.a -ldl


          thread_pool.h頭文件:

          #ifndef THREAD_POOL
          #define THREAD_POOL

          #include 
          "ace/Task.h"
          #include 
          "ace/Thread_Mutex.h"
          #include 
          "ace/Thread_Semaphore.h"

          class thread_pool : public ACE_Task<ACE_MT_SYNCH>
          {
          public:
              thread_pool ();

              
          ~thread_pool ();

              
          // begin the initial threads and waiting for request
              int start_pool (
                  
          int minnum = 5// min number of thread
                  int maxnum = 100,  // max number of thread
                  int waitsize = 1024// request queue length
                  int parsize = 1024); // your parameter size


              
          // pending request in work queue
              int wait_cnt ();

              
          // add one task to thread pool
              int add_task (void *arg, int size);

              
          // user defined work thread function
              virtual int service_func (void* arg);

              
          // overide base class function for thread pool logical
              virtual int svc (void);

              
          // not use
              virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg);

          private:
              
          int minnum_, maxnum_;
              
          int waitsize_, parsize_;

          //    ACE_Recursive_Thread_Mutex free_thread_cnt__mutex_;

              ACE_Thread_Semaphore 
          *pfree_thread_; // for free thread count

              
          long thread_flags_; // ace thread create flag
          }
          ;


          #endif 
          /* THREAD_POOL */




          thread_pool.cpp實現文件:
          #include "thread_pool.h"

          #define THREAD_POOL_DONOT_ACQUIRE    
          0x1001 // do not aquire again in new added thread

          thread_pool::thread_pool () 
          {
              thread_flags_ 
          = THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED;
              pfree_thread_ 
          = NULL;
          }


          thread_pool::
          ~thread_pool () {
              
          if (pfree_thread_)
                  delete pfree_thread_;
          }


          int thread_pool::wait_cnt () {
              
          return this->msg_queue()->message_count ();
          }


          int thread_pool::handle_timeout (const ACE_Time_Value &tv, const void *arg) {
              
          return 0;
          }


          int thread_pool::start_pool (
              
          int minnum,
              
          int maxnum, 
              
          int waitsize, 
              
          int parsize) {
              minnum_ 
          = minnum;
              maxnum_ 
          = maxnum;
              waitsize_ 
          = waitsize;
              parsize_ 
          = parsize;
              
              
          this->msg_queue()->high_water_mark (waitsize * parsize);

              pfree_thread_ 
          = new ACE_Thread_Semaphore (minnum);

              
          int ret = this->activate (thread_flags_, minnum);

              
          return ret;
          }


          int thread_pool::add_task (void *arg, int size) {
              ACE_Message_Block 
          *mb = new ACE_Message_Block (parsize_);
              
              
          // test free threads condition
              if (pfree_thread_->tryacquire () == -1// acquire one free thread to do work
                  printf ("free thread used up\n");

                  
          if (this->thr_count () < maxnum_) {
                      
          this->activate (thread_flags_, 11);
                      
                      printf (
          "new thread release\n");
                      pfree_thread_
          ->release ();
                      
                      printf (
          "new one thread, now %d\n"this->thr_count ());
                  }
           else {
                      printf (
          "can't new more threads, queue len %d\n", wait_cnt () + 1);
                  }

              }
           else {
                  
          // pfree_thread_->release (); // restore cnt, let svc function do acquire work
                  printf ("new task acquire\n");
                  mb
          ->set_flags (THREAD_POOL_DONOT_ACQUIRE);
              }

              
              
          // create msg
              printf ("add msg\n");

              memcpy (mb
          ->wr_ptr (), (char*) arg, size);
                      
              
          this->putq (mb);

              
          return 0;
          }



          int thread_pool::service_func (void* arg) {
              sleep (
          1);
              printf (
          "finished task %d in thread %02X\n"*(int*) arg, (int)ACE_Thread::self ());
              
          return 0;
          }



          int thread_pool::svc (void{
              printf (
          "thread started\n");

              
          while (1)
              
          {                
                  ACE_Message_Block 
          *= 0;
                  ACE_Time_Value wait 
          = ACE_OS::gettimeofday ();
                  wait.sec (wait.sec () 
          + 10); // timeout in 10 secs to test if more tasks need to do or we'll exit
                  
                  
          if (this->getq (b, &wait) < 0{
                      
          if (this->thr_count () > minnum_) {
                          printf (
          "over task acquire\n");
                          pfree_thread_
          ->acquire ();
                          printf (
          "delete one thread, now %d\n"this->thr_count ()-1);
                          
                          
          return 0;
                      }
           else 
                          
          continue// I'm the one of last min number of threads
                  }


                  
          if (b->flags () & THREAD_POOL_DONOT_ACQUIRE == 0{
                      printf (
          "queue task acquire\n");
                      pfree_thread_
          ->acquire (); // I'll use one free thread
                  }

                  
          else 
                      printf (
          "no need to acquire\n");

                  
          this->service_func ((void*)b->rd_ptr());
                                      
                  printf (
          "finished release\n");
                  b
          ->release();
                  
                  pfree_thread_
          ->release (); // added one free thread
              }


              
          return 0;
          }



          #ifdef THREAD_POOL_UNIT_TEST 
          int main (int argc, int ** argv) {
              printf (
          "begin test:\n");
          /*
              ACE_Thread_Semaphore* s = new ACE_Thread_Semaphore (0);
              s->release (3);
              s->acquire ();
              s->acquire ();
              s->acquire ();
              printf ("ok");
              return 0;
          */
              
              thread_pool t;
              t.start_pool (
          10100);

              
          for (int i=0; i<200; i++{
                  t.add_task (
          &i, sizeof(i));
                  
          if (i % 20 == 0)
                      sleep (
          1);
              }


              sleep (
          1000);
              
              printf (
          "end test:\n");
              
          return 0;
          }


          #endif

          posted on 2007-08-14 17:56 我愛佳娃 閱讀(6091) 評論(4)  編輯  收藏 所屬分類: 自寫類庫

          評論:
          # re: C++實現的帶最大最小線程數的線程池(基于ACE) 2007-08-14 21:08 | pass86
          怎么寫道了BLOGJAVA.COM,不過學ACE是好的。  回復  更多評論
            
          # re: C++實現的帶最大最小線程數的線程池(基于ACE) 2007-08-16 23:35 | alwayscy
          嘿嘿,大部分BLOGJAVA的同學都只有一個技術博客吧,只要保證大部分與JAVA有關就好了。  回復  更多評論
            
          # re: C++實現的帶最大最小線程數的線程池(基于ACE) 2008-01-13 14:40 | liuruigong
          編譯錯誤修改
          1#include <ace/OS.h>
          2.ACE_OS::sleep();
          3.最好把主函數的sleep(2000) 修改為
          ACE_Thread_Manager::instance()->wait();

          這個線程池寫的不錯  回復  更多評論
            
          # re: C++實現的帶最大最小線程數的線程池(基于ACE)[未登錄] 2008-01-14 22:09 | 我愛佳娃
          以前搞C++,ACE是個不錯的框架,最近接觸了不少JAVA的東西,感覺JAVA這東西琳瑯滿目。  回復  更多評論
            

          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 清流县| 深水埗区| 韶山市| 肥东县| 五峰| 砀山县| 定兴县| 罗山县| 手机| 清原| 衢州市| 英德市| 汝州市| 郁南县| 怀化市| 泸西县| 青川县| 濮阳市| 武义县| 南陵县| 班玛县| 乡城县| 象山县| 长海县| 崇州市| 通河县| 稻城县| 宁安市| 凭祥市| 卓尼县| 新乡县| 宜宾市| 东光县| 长乐市| 宜州市| 蒲江县| 开远市| 六枝特区| 兴义市| 都兰县| 宜川县|