Vincent

          Vicent's blog
          隨筆 - 74, 文章 - 0, 評(píng)論 - 5, 引用 - 0
          數(shù)據(jù)加載中……

          一個(gè)線程池的實(shí)現(xiàn)

          設(shè)計(jì)目標(biāo)
          ?????提供一個(gè)線程池的組件,具有良好的伸縮性,當(dāng)線程夠用時(shí),銷(xiāo)毀不用線程,當(dāng)線程不夠用時(shí),自動(dòng)增加線程數(shù)量;
          ?????提供一個(gè)工作任務(wù)接口和工作隊(duì)列,實(shí)際所需要的任務(wù)都必須實(shí)現(xiàn)這個(gè)工作任務(wù)接口,然后放入工作隊(duì)列中;
          ?????線程池中的線程從工作隊(duì)列中,自動(dòng)取得工作任務(wù),執(zhí)行任務(wù)。
          主要控制類(lèi)和功能接口設(shè)計(jì)
          線程池管理器?ThreadPoolManager?的功能:
          ?????管理線程池中的各個(gè)屬性變量
          ü????最大工作線程數(shù)
          ü????最小工作線程數(shù)
          ü????激活的工作線程總數(shù)
          ü????睡眠的工作線程總數(shù)
          ü????工作線程總數(shù)?(即:激活的工作線程總數(shù)+睡眠的工作線程總數(shù))
          ?????創(chuàng)建工作線程
          ?????銷(xiāo)毀工作線程
          ?????啟動(dòng)處于睡眠的工作線程
          ?????睡眠處于激活的工作線程
          ?????縮任務(wù):當(dāng)工作線程總數(shù)小于或等于最小工作線程數(shù)時(shí),銷(xiāo)毀多余的睡眠的工作線程,使得現(xiàn)有工作線程總數(shù)等于最小工作任務(wù)總數(shù)
          ?????伸任務(wù):當(dāng)任務(wù)隊(duì)列任務(wù)總數(shù)大于工作線程數(shù)時(shí),增加工作線程總數(shù)至最大工作線程數(shù)
          ?????提供線程池啟動(dòng)接口
          ?????提供線程池銷(xiāo)毀接口
          工作線程?WorkThread??的功能:
          ?????從工作隊(duì)列取得工作任務(wù)
          ?????執(zhí)行工作任務(wù)接口中的指定任務(wù)
          工作任務(wù)接口?ITask???的功能:
          ?????提供指定任務(wù)動(dòng)作
          工作隊(duì)列?IWorkQueue??的功能:
          ?????提供獲取任務(wù)接口,并刪除工作隊(duì)列中的任務(wù);
          ?????提供加入任務(wù)接口;
          ?????提供刪除任務(wù)接口;
          ?????提供取得任務(wù)總數(shù)接口;
          ?????提供自動(dòng)填任務(wù)接口;(當(dāng)任務(wù)總數(shù)少于或等于默認(rèn)總數(shù)的25%時(shí),自動(dòng)裝填)
          ?????提供刪除所有任務(wù)接口;


          Code


          ThreadPoolManager:
          =====================================
          CODE:
          package test.thread.pool1;
          import java.util.ArrayList;
          import java.util.List;
          import test.thread.pool1.impl.MyWorkQueue;
          
          /**
           * <p>Title: 線程池管理器</p>
           * <p>Description: </p>
           * <p>Copyright: Copyright (c) 2005</p>
           * <p>Company: </p>
           * @author not attributable
           * @version 1.0
           */
          
          public class ThreadPoolManager {
            /*最大線程數(shù)*/
            private int threads_max_num;
          
            /*最小線程數(shù)*/
            private int threads_min_num;
            
            /* 線程池線程增長(zhǎng)步長(zhǎng) */
            private int threads_increase_step = 5;
          
            /* 任務(wù)工作隊(duì)列 */
            private IWorkQueue queue;
            
            /* 線程池監(jiān)視狗 */
            private PoolWatchDog poolWatchDog ;
            
            /* 隊(duì)列線程 */
            private Thread queueThread ;
            
            /* 線程池 封裝所有工作線程的數(shù)據(jù)結(jié)構(gòu) */
            private List pool = new ArrayList();
            
            /* 線程池中 封裝所有鈍化后的數(shù)據(jù)結(jié)構(gòu)*/
            private List passivePool = new ArrayList();
            
            /* 空閑60秒 */
            private static final long IDLE_TIMEOUT = 60000L;
            
            /* 關(guān)閉連接池標(biāo)志位 */
            private boolean close = false;
            
            /**
             * 線程池管理器
             * @param queue 任務(wù)隊(duì)列
             * @param threads_min_num 工作線程最小數(shù)
             * @param threads_max_num 工作線程最大數(shù)
             */
            public ThreadPoolManager(int threads_max_num
                                    ,int threads_min_num
                                    ,IWorkQueue queue){
              this.threads_max_num = threads_max_num;
              this.threads_min_num = threads_min_num;
              this.queue = queue;    
            }
          
            /**
             * 線程池啟動(dòng)
             */
            public void startPool(){
              System.out.println("=== startPool..........");
              poolWatchDog = new PoolWatchDog("PoolWatchDog");
              poolWatchDog.setDaemon(true);
              poolWatchDog.start();
              System.out.println("=== startPool..........over");
            }
          
            /**
             * 線程池銷(xiāo)毀接口
             */
            public void destoryPool(){
              System.out.println("==========================DestoryPool starting ...");
              this.close = true;
              int pool_size = this.pool.size();
              
              //中斷隊(duì)列線程
              System.out.println("===Interrupt queue thread ... ");
              queueThread.interrupt();
              queueThread = null;
              
              System.out.println("===Interrupt thread pool ... ");
              Thread pool_thread = null;
              for(int i=0; i<pool_size; i++){
                pool_thread = (Thread)pool.get(i);
                if(pool_thread !=null 
                && pool_thread.isAlive() 
                && !pool_thread.isInterrupted()){
                  pool_thread.interrupt();
                  System.out.println("Stop pool_thread:"
                                    +pool_thread.getName()+"[interrupt] "
                                    +pool_thread.isInterrupted());
                }
              }//end for
              
              if(pool != null){
                pool.clear();
              }
              if(passivePool != null){
                pool.clear();
              }
              
              try{
                System.out.println("=== poolWatchDog.join() starting ...");
                poolWatchDog.join();
                System.out.println("=== poolWatchDog.join() is over ...");
              }
              catch(Throwable ex){
                System.out.println("###poolWatchDog ... join method throw a exception ... "
                                    +ex.toString());
              }
              
              poolWatchDog =null;
              System.out.println("==============================DestoryPool is over ...");    
            }
            
            
            public static void main(String[] args) throws Exception{
              ThreadPoolManager threadPoolManager1 = new ThreadPoolManager(10,5,new MyWorkQueue(50,30000));
              
              threadPoolManager1.startPool();
              Thread.sleep(60000);
              threadPoolManager1.destoryPool();
            }
            
            /**
             * 線程池監(jiān)視狗
             */
            private class PoolWatchDog extends Thread{
              public PoolWatchDog(String name){
                super(name);
              }
            
              public void run(){
                Thread workThread = null;
                Runnable run = null;
                
                //開(kāi)啟任務(wù)隊(duì)列線程,獲取數(shù)據(jù)--------
                System.out.println("===QueueThread starting ... ... ");
                queueThread = new Thread(new QueueThread(),"QueueThread");
                queueThread.start();
                
                System.out.println("===Initial thread Pool ... ...");
                //初始化線程池的最小線程數(shù),并放入池中
                for(int i=0; i<threads_min_num; i++){
                  run = new WorkThread();
                  workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
                  workThread.start();
                  if(i == threads_min_num -1){
                    workThread = null;
                    run = null;
                  }
                }
                System.out.println("===Initial thread Pool..........over ,and get pool's size:"+pool.size());
          
                //線程池線程動(dòng)態(tài)增加線程算法--------------
                while(!close){
                
                  //等待5秒鐘,等上述線程都啟動(dòng)----------
                  synchronized(this){          
                    try{
                      System.out.println("===Wait the [last time] threads starting ....");
                      this.wait(15000);
                    }
                    catch(Throwable ex){
                      System.out.println("###PoolWatchDog invoking is failure ... "+ex);
                    }
                  }//end synchronized
                    
                  //開(kāi)始增加線程-----------------------spread動(dòng)作
                  int queue_size = queue.getTaskSize();
                  int temp_size = (queue_size - threads_min_num);
                  
                  if((temp_size > 0) && (temp_size/threads_increase_step > 2) ){
                    System.out.println("================Spread thread pool starting ....");
                    for(int i=0; i<threads_increase_step && (pool.size() < threads_max_num); i++){
                      System.out.println("=== Spread thread num : "+i);
                      run = new WorkThread();
                      workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
                      workThread.start();
                    }//end for
                    
                    workThread = null;
                    run = null;    
                    System.out.println("===Spread thread pool is over .... and pool size:"+pool.size());
                  }//end if
                    
                  //刪除已經(jīng)多余的睡眠線程-------------shrink動(dòng)作
                  int more_sleep_size = pool.size() - threads_min_num;//最多能刪除的線程數(shù)
                  int sleep_threads_size = passivePool.size();
                  if(more_sleep_size >0 && sleep_threads_size >0){
                    System.out.println("================Shrink thread pool starting ....");        
                    for(int i=0; i < more_sleep_size && i < sleep_threads_size ; i++){
                      System.out.println("=== Shrink thread num : "+i);
                      Thread removeThread = (Thread)passivePool.get(0);
                      if(removeThread != null && removeThread.isAlive() && !removeThread.isInterrupted()){
                        removeThread.interrupt();
                      }
                    }
                    System.out.println("===Shrink thread pool is over .... and pool size:"+pool.size());          
                  }
          
                  System.out.println("===End one return [shrink - spread operator] ....");    
                }//end while
              }//end run 
            }//end private class
            
            /**
             * 工作線程
             */
            class WorkThread implements Runnable{
            
              public WorkThread(){
              }
            
              public void run(){
                String name = Thread.currentThread().getName();
                System.out.println("===Thread.currentThread():"+name);
                pool.add(Thread.currentThread());    
              
                while(true){
                
                  //獲取任務(wù)---------
                  ITask task = null;
                  try{
                    System.out.println("===Get task from queue is starting ... ");
                    //看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)----
                    if(Thread.currentThread().isInterrupted()){
                      System.out.println("===Breaking current thread and jump whlie [1] ... ");
                      break;
                    }
                    task = queue.getTask();
                  }
                  catch(Throwable ex){
                    System.out.println("###No task in queue:"+ex);
                  }//end tryc
                  
                  if(task != null){
                    //執(zhí)行任務(wù)---------
                    try{
                      System.out.println("===Execute the task is starting ... ");
                      //看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)----
                      if(Thread.currentThread().isInterrupted()){
                        System.out.println("===Breaking current thread and jump whlie [1] ... ");
                        break;
                      }     
                      task.executeTask();
                      //任務(wù)執(zhí)行完畢-------
                      System.out.println("===Execute the task is over ... ");
                    }
                    catch(Throwable ex){
                      System.out.println("###Execute the task is failure ... "+ex);
                    }//end tryc
                    
                  }else{
                    //沒(méi)有任務(wù),則鈍化線程至規(guī)定時(shí)間--------
                    synchronized(this){
                      try{
                        System.out.println("===Passivate into passivePool ... ");
                        
                        //看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)----
                        boolean isInterrupted = Thread.currentThread().isInterrupted();
                        if(isInterrupted){
                          System.out.println("===Breaking current thread and jump whlie [1] ... ");
                          break;
                        }
          //              passivePool.add(this);
                      passivePool.add(Thread.currentThread());
          
                        
                        //準(zhǔn)備睡眠線程-------
                        isInterrupted = Thread.currentThread().isInterrupted();
                        if(isInterrupted){
                          System.out.println("===Breaking current thread and jump whlie [2] ... ");
                          break;
                        }              
                        this.wait(IDLE_TIMEOUT);
                      }
                      catch(Throwable ex1){
                        System.out.println("###Current Thread passivate is failure ... break while cycle. "+ex1);
                        break;
                      }
                    }          
                  }        
                }//end while--------
                
                if(pool.contains(passivePool)){
                  pool.remove(this);
                }
                if(passivePool.contains(passivePool)){
                  passivePool.remove(this);
                }
                System.out.println("===The thread execute over ... "); 
              }//end run----------
            }
            
            
            class QueueThread implements Runnable{
            
              public QueueThread(){
              }
            
              public void run(){
                while(true){
                  //自動(dòng)裝在任務(wù)--------
                  queue.autoAddTask();
                  System.out.println("===The size of queue's task is "+queue.getTaskSize());
                
                  synchronized(this){
                    if(Thread.currentThread().isInterrupted()){
                      break;
                    }else{
                        try{
                          this.wait(queue.getLoadDataPollingTime());
                        }
                        catch(Throwable ex){
                          System.out.println("===QueueThread invoked wait is failure ... break while cycle."+ex);
                          break;
                        }
                    }//end if
                  }//end synchr
                  
                }//end while
              }//end run
            } 
          }
          






          WorkQueue
          =====================================
          CODE:
          package test.thread.pool1;
          
          import java.util.LinkedList;
          import test.thread.pool1.impl.MyTask;
          
          /**
           * <p>Title: 工作隊(duì)列對(duì)象 </p>
           * <p>Description: </p>
           * <p>Copyright: Copyright (c) 2005</p>
           * <p>Company: </p>
           * @author not attributable
           * @version 1.0
           */
          
          public abstract class WorkQueue implements IWorkQueue{
            /* 預(yù)計(jì)裝載量 */
            private int load_size;
            
            /* 數(shù)據(jù)裝載輪循時(shí)間 */
            private long load_polling_time;
            
            /* 隊(duì)列 */
            private LinkedList queue = new LinkedList();
            
            /**
             * 
             * @param load_size 預(yù)計(jì)裝載量
             * @param load_polling_time 數(shù)據(jù)裝載輪循時(shí)間
             */
            public WorkQueue(int load_size,long load_polling_time){
              this.load_size = (load_size <= 10) ? 10 : load_size;
              this.load_polling_time = load_polling_time;
            }
          
            /* 數(shù)據(jù)裝載輪循時(shí)間 */
            public long getLoadDataPollingTime(){
              return this.load_polling_time;
            }
          
          
            /*獲取任務(wù),并刪除隊(duì)列中的任務(wù)*/
            public synchronized ITask getTask(){
              ITask task = (ITask)queue.getFirst();
              queue.removeFirst();
              return task;
            }
          
            /*加入任務(wù)*/
            public void  addTask(ITask task){
              queue.addLast(task);
            }
          
            /*刪除任務(wù)*/
            public synchronized void removeTask(ITask task){
              queue.remove(task);
            }
          
            /*任務(wù)總數(shù)*/
            public synchronized int getTaskSize(){
              return queue.size();
            }
          
            /*自動(dòng)裝填任務(wù)*/
            public synchronized void autoAddTask(){
            
              synchronized(this){
                float load_size_auto = load_size - getTaskSize() / load_size;
                System.out.println("===load_size_auto:"+load_size_auto);
                
                if(load_size_auto > 0.25){        
                  autoAddTask0();
                }
                else {
                  System.out.println("=== Not must load new work queue ... Now! ");
                }    
              }
            }
          
            /*刪除所有任務(wù)*/
            public synchronized void clearAllTask(){
              queue.clear();
            }
            
            /**
             * 程序員自己實(shí)現(xiàn)該方法
             */
            protected abstract void autoAddTask0();
          }
          





          MyWorkQueue
          =====================================
          CODE:
          package test.thread.pool1.impl;
          
          import java.util.LinkedList;
          import test.thread.pool1.WorkQueue;
          
          /**
           * <p>Title: 例子工作隊(duì)列對(duì)象 </p>
           * <p>Description: </p>
           * <p>Copyright: Copyright (c) 2005</p>
           * <p>Company: </p>
           * @author not attributable
           * @version 1.0
           */
          
          public class MyWorkQueue extends WorkQueue{
          
            /**
             * @param load_size 預(yù)計(jì)裝載量
             * @param load_polling_time 數(shù)據(jù)裝載輪循時(shí)間
             */
            public MyWorkQueue(int load_size,long load_polling_time){
              super(load_size,load_polling_time);
            }
          
            /**
             * 自動(dòng)加載任務(wù)
             */
            protected synchronized void autoAddTask0(){
              //-------------------
              System.out.println("===MyWorkQueue ...  invoked autoAddTask0() method ...");
              for(int i=0; i<10; i++){
                System.out.println("===add task :"+i);
                this.addTask(new MyTask());
              }    
              //-------------------
            }
          }
          





          MyTask
          =====================================
          CODE:
          package test.thread.pool1.impl;
          import test.thread.pool1.ITask;
          
          /**
           * <p>Title: 工作任務(wù)接口 </p>
           * <p>Description: </p>
           * <p>Copyright: Copyright (c) 2005</p>
           * <p>Company: </p>
           * @author not attributable
           * @version 1.0
           */
          
          public class MyTask implements ITask {
          
            /**
             * 執(zhí)行的任務(wù)
             * @throws java.lang.Throwable
             */
            public void executeTask() throws Throwable{
              System.out.println("["+this.hashCode()+"] MyTask ... invoked executeTask() method ... ");
            }
          }
          

          posted on 2006-08-24 16:55 Binary 閱讀(3733) 評(píng)論(2)  編輯  收藏 所屬分類(lèi): j2se

          評(píng)論

          # re: 一個(gè)線程池的實(shí)現(xiàn)  回復(fù)  更多評(píng)論   

          編譯時(shí),
          ITask出錯(cuò)
          IWorkQueue找不到.
          是不是弄少了段代碼?
          2007-03-05 23:29 | Peng

          # re: 一個(gè)線程池的實(shí)現(xiàn)  回復(fù)  更多評(píng)論   

          文章不錯(cuò),但是缺代碼了,作者能補(bǔ)全嗎?
          2008-05-25 23:30 | lindily
          主站蜘蛛池模板: 镇坪县| 新丰县| 秀山| 太保市| 尤溪县| 邹城市| 颍上县| 商城县| 三门县| 莒南县| 盘锦市| 军事| 玉林市| 东至县| 巴楚县| 巩义市| 清水河县| 维西| 宜黄县| 三穗县| 淮安市| 弥勒县| 天镇县| 阜南县| 丹巴县| 永寿县| 东乌珠穆沁旗| 子洲县| 个旧市| 哈巴河县| 临西县| 内黄县| 林州市| 老河口市| 柯坪县| 永顺县| 南阳市| 乐陵市| 奉贤区| 台东市| 通海县|