隨筆-124  評論-194  文章-0  trackbacks-0

          最近需要一個能根據(jù)請求數(shù)變化的線程池,JAVA有這樣的東西,可是C++下好像一般只是固定大小的線程池。所以就基于ACE寫了個,只做了初步測試。

          主要思想是:
          1. 重載ACE_Task,這相當(dāng)于是個固定線程池,用一個信號量(ACE_Thread_Semaphore)來記數(shù)空閑線程數(shù)。
          2. 初始化時根據(jù)用戶的輸入,確定最少線程數(shù)minnum和最大線程數(shù)maxnum,當(dāng)多個請求到來,并且無空閑線程(信號量用光),判斷總線程數(shù)小于maxnum,就開始強迫增加線程數(shù)。
          3. 當(dāng)線程響應(yīng)完一個請求(任務(wù))后,如果當(dāng)前任務(wù)隊列為空,且線程數(shù)大于minnum,就退出本線程。這里做了一個優(yōu)化,就算滿足條件,線程也會在隊列上再等待10秒,防止線程池抖動帶來不必要的開銷。

          使用:
          重載這個類,重載service_func函數(shù)實現(xiàn)自己的任務(wù)處理。
          start_pool初始化線程池,之后,就可以用add_task向線程池添加任務(wù)。
          它會根據(jù)請求的數(shù)量自動控制池大小進行處理。
          已經(jīng)在LINUX下測試通過。由于ACE是跨平臺的,所以這個實現(xiàn)也應(yīng)該可以在WINDOWS下工作。

          編譯:
          帶THREAD_POOL_UNIT_TEST選項,則編譯出自測程序test
          gcc -g -Wall -O2 -g -Wall -I. -I../ -I../mon/comm/ACE_wrappers -g -DTHREAD_POOL_UNIT_TEST -o test thread_pool.cpp -lpthread -lm -lz -lstdc++ ../mon/comm/ACE_wrappers/ace/libACE.a -ldl


          thread_pool.h頭文件:

          #ifndef THREAD_POOL
          #define THREAD_POOL

          #include 
          "ace/Task.h"
          #include 
          "ace/Thread_Mutex.h"
          #include 
          "ace/Thread_Semaphore.h"

          class thread_pool : public ACE_Task<ACE_MT_SYNCH>
          {
          public:
              thread_pool ();

              
          ~thread_pool ();

              
          // begin the initial threads and waiting for request
              int start_pool (
                  
          int minnum = 5// min number of thread
                  int maxnum = 100,  // max number of thread
                  int waitsize = 1024// request queue length
                  int parsize = 1024); // your parameter size


              
          // pending request in work queue
              int wait_cnt ();

              
          // add one task to thread pool
              int add_task (void *arg, int size);

              
          // user defined work thread function
              virtual int service_func (void* arg);

              
          // overide base class function for thread pool logical
              virtual int svc (void);

              
          // not use
              virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg);

          private:
              
          int minnum_, maxnum_;
              
          int waitsize_, parsize_;

          //    ACE_Recursive_Thread_Mutex free_thread_cnt__mutex_;

              ACE_Thread_Semaphore 
          *pfree_thread_; // for free thread count

              
          long thread_flags_; // ace thread create flag
          }
          ;


          #endif 
          /* THREAD_POOL */




          thread_pool.cpp實現(xiàn)文件:
          #include "thread_pool.h"

          #define THREAD_POOL_DONOT_ACQUIRE    
          0x1001 // do not aquire again in new added thread

          thread_pool::thread_pool () 
          {
              thread_flags_ 
          = THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED;
              pfree_thread_ 
          = NULL;
          }


          thread_pool::
          ~thread_pool () {
              
          if (pfree_thread_)
                  delete pfree_thread_;
          }


          int thread_pool::wait_cnt () {
              
          return this->msg_queue()->message_count ();
          }


          int thread_pool::handle_timeout (const ACE_Time_Value &tv, const void *arg) {
              
          return 0;
          }


          int thread_pool::start_pool (
              
          int minnum,
              
          int maxnum, 
              
          int waitsize, 
              
          int parsize) {
              minnum_ 
          = minnum;
              maxnum_ 
          = maxnum;
              waitsize_ 
          = waitsize;
              parsize_ 
          = parsize;
              
              
          this->msg_queue()->high_water_mark (waitsize * parsize);

              pfree_thread_ 
          = new ACE_Thread_Semaphore (minnum);

              
          int ret = this->activate (thread_flags_, minnum);

              
          return ret;
          }


          int thread_pool::add_task (void *arg, int size) {
              ACE_Message_Block 
          *mb = new ACE_Message_Block (parsize_);
              
              
          // test free threads condition
              if (pfree_thread_->tryacquire () == -1// acquire one free thread to do work
                  printf ("free thread used up\n");

                  
          if (this->thr_count () < maxnum_) {
                      
          this->activate (thread_flags_, 11);
                      
                      printf (
          "new thread release\n");
                      pfree_thread_
          ->release ();
                      
                      printf (
          "new one thread, now %d\n"this->thr_count ());
                  }
           else {
                      printf (
          "can't new more threads, queue len %d\n", wait_cnt () + 1);
                  }

              }
           else {
                  
          // pfree_thread_->release (); // restore cnt, let svc function do acquire work
                  printf ("new task acquire\n");
                  mb
          ->set_flags (THREAD_POOL_DONOT_ACQUIRE);
              }

              
              
          // create msg
              printf ("add msg\n");

              memcpy (mb
          ->wr_ptr (), (char*) arg, size);
                      
              
          this->putq (mb);

              
          return 0;
          }



          int thread_pool::service_func (void* arg) {
              sleep (
          1);
              printf (
          "finished task %d in thread %02X\n"*(int*) arg, (int)ACE_Thread::self ());
              
          return 0;
          }



          int thread_pool::svc (void{
              printf (
          "thread started\n");

              
          while (1)
              
          {                
                  ACE_Message_Block 
          *= 0;
                  ACE_Time_Value wait 
          = ACE_OS::gettimeofday ();
                  wait.sec (wait.sec () 
          + 10); // timeout in 10 secs to test if more tasks need to do or we'll exit
                  
                  
          if (this->getq (b, &wait) < 0{
                      
          if (this->thr_count () > minnum_) {
                          printf (
          "over task acquire\n");
                          pfree_thread_
          ->acquire ();
                          printf (
          "delete one thread, now %d\n"this->thr_count ()-1);
                          
                          
          return 0;
                      }
           else 
                          
          continue// I'm the one of last min number of threads
                  }


                  
          if (b->flags () & THREAD_POOL_DONOT_ACQUIRE == 0{
                      printf (
          "queue task acquire\n");
                      pfree_thread_
          ->acquire (); // I'll use one free thread
                  }

                  
          else 
                      printf (
          "no need to acquire\n");

                  
          this->service_func ((void*)b->rd_ptr());
                                      
                  printf (
          "finished release\n");
                  b
          ->release();
                  
                  pfree_thread_
          ->release (); // added one free thread
              }


              
          return 0;
          }



          #ifdef THREAD_POOL_UNIT_TEST 
          int main (int argc, int ** argv) {
              printf (
          "begin test:\n");
          /*
              ACE_Thread_Semaphore* s = new ACE_Thread_Semaphore (0);
              s->release (3);
              s->acquire ();
              s->acquire ();
              s->acquire ();
              printf ("ok");
              return 0;
          */
              
              thread_pool t;
              t.start_pool (
          10100);

              
          for (int i=0; i<200; i++{
                  t.add_task (
          &i, sizeof(i));
                  
          if (i % 20 == 0)
                      sleep (
          1);
              }


              sleep (
          1000);
              
              printf (
          "end test:\n");
              
          return 0;
          }


          #endif

          posted on 2007-08-14 17:56 我愛佳娃 閱讀(6097) 評論(4)  編輯  收藏 所屬分類: 自寫類庫

          評論:
          # re: C++實現(xiàn)的帶最大最小線程數(shù)的線程池(基于ACE) 2007-08-14 21:08 | pass86
          怎么寫道了BLOGJAVA.COM,不過學(xué)ACE是好的。  回復(fù)  更多評論
            
          # re: C++實現(xiàn)的帶最大最小線程數(shù)的線程池(基于ACE) 2007-08-16 23:35 | alwayscy
          嘿嘿,大部分BLOGJAVA的同學(xué)都只有一個技術(shù)博客吧,只要保證大部分與JAVA有關(guān)就好了。  回復(fù)  更多評論
            
          # re: C++實現(xiàn)的帶最大最小線程數(shù)的線程池(基于ACE) 2008-01-13 14:40 | liuruigong
          編譯錯誤修改
          1#include <ace/OS.h>
          2.ACE_OS::sleep();
          3.最好把主函數(shù)的sleep(2000) 修改為
          ACE_Thread_Manager::instance()->wait();

          這個線程池寫的不錯  回復(fù)  更多評論
            
          # re: C++實現(xiàn)的帶最大最小線程數(shù)的線程池(基于ACE)[未登錄] 2008-01-14 22:09 | 我愛佳娃
          以前搞C++,ACE是個不錯的框架,最近接觸了不少JAVA的東西,感覺JAVA這東西琳瑯滿目。  回復(fù)  更多評論
            

          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 荣昌县| 嘉祥县| 黔南| 耿马| 双牌县| 阳新县| 宜章县| 柞水县| 舒城县| 临沂市| 沂源县| 文山县| 左云县| 肇源县| 克什克腾旗| 横峰县| 赤壁市| 连城县| 延吉市| 鄂尔多斯市| 彰化县| 赫章县| 莱芜市| 温州市| 临猗县| 兴仁县| 宁波市| 田阳县| 和龙市| 西林县| 治多县| 凌海市| 楚雄市| 靖远县| 杨浦区| 登封市| 聊城市| 宁远县| 松滋市| 沙洋县| 开封市|