一個(gè)線程池的實(shí)現(xiàn)
設(shè)計(jì)目標(biāo)
?????提供一個(gè)線程池的組件,具有良好的伸縮性,當(dāng)線程夠用時(shí),銷(xiāo)毀不用線程,當(dāng)線程不夠用時(shí),自動(dòng)增加線程數(shù)量;
?????提供一個(gè)工作任務(wù)接口和工作隊(duì)列,實(shí)際所需要的任務(wù)都必須實(shí)現(xiàn)這個(gè)工作任務(wù)接口,然后放入工作隊(duì)列中;
?????線程池中的線程從工作隊(duì)列中,自動(dòng)取得工作任務(wù),執(zhí)行任務(wù)。
主要控制類(lèi)和功能接口設(shè)計(jì)
線程池管理器?ThreadPoolManager?的功能:
?????管理線程池中的各個(gè)屬性變量
ü????最大工作線程數(shù)
ü????最小工作線程數(shù)
ü????激活的工作線程總數(shù)
ü????睡眠的工作線程總數(shù)
ü????工作線程總數(shù)?(即:激活的工作線程總數(shù)+睡眠的工作線程總數(shù))
?????創(chuàng)建工作線程
?????銷(xiāo)毀工作線程
?????啟動(dòng)處于睡眠的工作線程
?????睡眠處于激活的工作線程
?????縮任務(wù):當(dāng)工作線程總數(shù)小于或等于最小工作線程數(shù)時(shí),銷(xiāo)毀多余的睡眠的工作線程,使得現(xiàn)有工作線程總數(shù)等于最小工作任務(wù)總數(shù)
?????伸任務(wù):當(dāng)任務(wù)隊(duì)列任務(wù)總數(shù)大于工作線程數(shù)時(shí),增加工作線程總數(shù)至最大工作線程數(shù)
?????提供線程池啟動(dòng)接口
?????提供線程池銷(xiāo)毀接口
工作線程?WorkThread??的功能:
?????從工作隊(duì)列取得工作任務(wù)
?????執(zhí)行工作任務(wù)接口中的指定任務(wù)
工作任務(wù)接口?ITask???的功能:
?????提供指定任務(wù)動(dòng)作
工作隊(duì)列?IWorkQueue??的功能:
?????提供獲取任務(wù)接口,并刪除工作隊(duì)列中的任務(wù);
?????提供加入任務(wù)接口;
?????提供刪除任務(wù)接口;
?????提供取得任務(wù)總數(shù)接口;
?????提供自動(dòng)填任務(wù)接口;(當(dāng)任務(wù)總數(shù)少于或等于默認(rèn)總數(shù)的25%時(shí),自動(dòng)裝填)
?????提供刪除所有任務(wù)接口;
Code
ThreadPoolManager:
=====================================
WorkQueue
=====================================
MyWorkQueue
=====================================
MyTask
=====================================
?????提供一個(gè)線程池的組件,具有良好的伸縮性,當(dāng)線程夠用時(shí),銷(xiāo)毀不用線程,當(dāng)線程不夠用時(shí),自動(dòng)增加線程數(shù)量;
?????提供一個(gè)工作任務(wù)接口和工作隊(duì)列,實(shí)際所需要的任務(wù)都必須實(shí)現(xiàn)這個(gè)工作任務(wù)接口,然后放入工作隊(duì)列中;
?????線程池中的線程從工作隊(duì)列中,自動(dòng)取得工作任務(wù),執(zhí)行任務(wù)。
主要控制類(lèi)和功能接口設(shè)計(jì)
線程池管理器?ThreadPoolManager?的功能:
?????管理線程池中的各個(gè)屬性變量
ü????最大工作線程數(shù)
ü????最小工作線程數(shù)
ü????激活的工作線程總數(shù)
ü????睡眠的工作線程總數(shù)
ü????工作線程總數(shù)?(即:激活的工作線程總數(shù)+睡眠的工作線程總數(shù))
?????創(chuàng)建工作線程
?????銷(xiāo)毀工作線程
?????啟動(dòng)處于睡眠的工作線程
?????睡眠處于激活的工作線程
?????縮任務(wù):當(dāng)工作線程總數(shù)小于或等于最小工作線程數(shù)時(shí),銷(xiāo)毀多余的睡眠的工作線程,使得現(xiàn)有工作線程總數(shù)等于最小工作任務(wù)總數(shù)
?????伸任務(wù):當(dāng)任務(wù)隊(duì)列任務(wù)總數(shù)大于工作線程數(shù)時(shí),增加工作線程總數(shù)至最大工作線程數(shù)
?????提供線程池啟動(dòng)接口
?????提供線程池銷(xiāo)毀接口
工作線程?WorkThread??的功能:
?????從工作隊(duì)列取得工作任務(wù)
?????執(zhí)行工作任務(wù)接口中的指定任務(wù)
工作任務(wù)接口?ITask???的功能:
?????提供指定任務(wù)動(dòng)作
工作隊(duì)列?IWorkQueue??的功能:
?????提供獲取任務(wù)接口,并刪除工作隊(duì)列中的任務(wù);
?????提供加入任務(wù)接口;
?????提供刪除任務(wù)接口;
?????提供取得任務(wù)總數(shù)接口;
?????提供自動(dòng)填任務(wù)接口;(當(dāng)任務(wù)總數(shù)少于或等于默認(rèn)總數(shù)的25%時(shí),自動(dòng)裝填)
?????提供刪除所有任務(wù)接口;
Code
ThreadPoolManager:
=====================================
CODE:package test.thread.pool1; import java.util.ArrayList; import java.util.List; import test.thread.pool1.impl.MyWorkQueue; /** * <p>Title: 線程池管理器</p> * <p>Description: </p> * <p>Copyright: Copyright (c) 2005</p> * <p>Company: </p> * @author not attributable * @version 1.0 */ public class ThreadPoolManager { /*最大線程數(shù)*/ private int threads_max_num; /*最小線程數(shù)*/ private int threads_min_num; /* 線程池線程增長(zhǎng)步長(zhǎng) */ private int threads_increase_step = 5; /* 任務(wù)工作隊(duì)列 */ private IWorkQueue queue; /* 線程池監(jiān)視狗 */ private PoolWatchDog poolWatchDog ; /* 隊(duì)列線程 */ private Thread queueThread ; /* 線程池 封裝所有工作線程的數(shù)據(jù)結(jié)構(gòu) */ private List pool = new ArrayList(); /* 線程池中 封裝所有鈍化后的數(shù)據(jù)結(jié)構(gòu)*/ private List passivePool = new ArrayList(); /* 空閑60秒 */ private static final long IDLE_TIMEOUT = 60000L; /* 關(guān)閉連接池標(biāo)志位 */ private boolean close = false; /** * 線程池管理器 * @param queue 任務(wù)隊(duì)列 * @param threads_min_num 工作線程最小數(shù) * @param threads_max_num 工作線程最大數(shù) */ public ThreadPoolManager(int threads_max_num ,int threads_min_num ,IWorkQueue queue){ this.threads_max_num = threads_max_num; this.threads_min_num = threads_min_num; this.queue = queue; } /** * 線程池啟動(dòng) */ public void startPool(){ System.out.println("=== startPool.........."); poolWatchDog = new PoolWatchDog("PoolWatchDog"); poolWatchDog.setDaemon(true); poolWatchDog.start(); System.out.println("=== startPool..........over"); } /** * 線程池銷(xiāo)毀接口 */ public void destoryPool(){ System.out.println("==========================DestoryPool starting ..."); this.close = true; int pool_size = this.pool.size(); //中斷隊(duì)列線程 System.out.println("===Interrupt queue thread ... "); queueThread.interrupt(); queueThread = null; System.out.println("===Interrupt thread pool ... "); Thread pool_thread = null; for(int i=0; i<pool_size; i++){ pool_thread = (Thread)pool.get(i); if(pool_thread !=null && pool_thread.isAlive() && !pool_thread.isInterrupted()){ pool_thread.interrupt(); System.out.println("Stop pool_thread:" +pool_thread.getName()+"[interrupt] " +pool_thread.isInterrupted()); } }//end for if(pool != null){ pool.clear(); } if(passivePool != null){ pool.clear(); } try{ System.out.println("=== poolWatchDog.join() starting ..."); poolWatchDog.join(); System.out.println("=== poolWatchDog.join() is over ..."); } catch(Throwable ex){ System.out.println("###poolWatchDog ... join method throw a exception ... " +ex.toString()); } poolWatchDog =null; System.out.println("==============================DestoryPool is over ..."); } public static void main(String[] args) throws Exception{ ThreadPoolManager threadPoolManager1 = new ThreadPoolManager(10,5,new MyWorkQueue(50,30000)); threadPoolManager1.startPool(); Thread.sleep(60000); threadPoolManager1.destoryPool(); } /** * 線程池監(jiān)視狗 */ private class PoolWatchDog extends Thread{ public PoolWatchDog(String name){ super(name); } public void run(){ Thread workThread = null; Runnable run = null; //開(kāi)啟任務(wù)隊(duì)列線程,獲取數(shù)據(jù)-------- System.out.println("===QueueThread starting ... ... "); queueThread = new Thread(new QueueThread(),"QueueThread"); queueThread.start(); System.out.println("===Initial thread Pool ... ..."); //初始化線程池的最小線程數(shù),并放入池中 for(int i=0; i<threads_min_num; i++){ run = new WorkThread(); workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i); workThread.start(); if(i == threads_min_num -1){ workThread = null; run = null; } } System.out.println("===Initial thread Pool..........over ,and get pool's size:"+pool.size()); //線程池線程動(dòng)態(tài)增加線程算法-------------- while(!close){ //等待5秒鐘,等上述線程都啟動(dòng)---------- synchronized(this){ try{ System.out.println("===Wait the [last time] threads starting ...."); this.wait(15000); } catch(Throwable ex){ System.out.println("###PoolWatchDog invoking is failure ... "+ex); } }//end synchronized //開(kāi)始增加線程-----------------------spread動(dòng)作 int queue_size = queue.getTaskSize(); int temp_size = (queue_size - threads_min_num); if((temp_size > 0) && (temp_size/threads_increase_step > 2) ){ System.out.println("================Spread thread pool starting ...."); for(int i=0; i<threads_increase_step && (pool.size() < threads_max_num); i++){ System.out.println("=== Spread thread num : "+i); run = new WorkThread(); workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i); workThread.start(); }//end for workThread = null; run = null; System.out.println("===Spread thread pool is over .... and pool size:"+pool.size()); }//end if //刪除已經(jīng)多余的睡眠線程-------------shrink動(dòng)作 int more_sleep_size = pool.size() - threads_min_num;//最多能刪除的線程數(shù) int sleep_threads_size = passivePool.size(); if(more_sleep_size >0 && sleep_threads_size >0){ System.out.println("================Shrink thread pool starting ...."); for(int i=0; i < more_sleep_size && i < sleep_threads_size ; i++){ System.out.println("=== Shrink thread num : "+i); Thread removeThread = (Thread)passivePool.get(0); if(removeThread != null && removeThread.isAlive() && !removeThread.isInterrupted()){ removeThread.interrupt(); } } System.out.println("===Shrink thread pool is over .... and pool size:"+pool.size()); } System.out.println("===End one return [shrink - spread operator] ...."); }//end while }//end run }//end private class /** * 工作線程 */ class WorkThread implements Runnable{ public WorkThread(){ } public void run(){ String name = Thread.currentThread().getName(); System.out.println("===Thread.currentThread():"+name); pool.add(Thread.currentThread()); while(true){ //獲取任務(wù)--------- ITask task = null; try{ System.out.println("===Get task from queue is starting ... "); //看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)---- if(Thread.currentThread().isInterrupted()){ System.out.println("===Breaking current thread and jump whlie [1] ... "); break; } task = queue.getTask(); } catch(Throwable ex){ System.out.println("###No task in queue:"+ex); }//end tryc if(task != null){ //執(zhí)行任務(wù)--------- try{ System.out.println("===Execute the task is starting ... "); //看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)---- if(Thread.currentThread().isInterrupted()){ System.out.println("===Breaking current thread and jump whlie [1] ... "); break; } task.executeTask(); //任務(wù)執(zhí)行完畢------- System.out.println("===Execute the task is over ... "); } catch(Throwable ex){ System.out.println("###Execute the task is failure ... "+ex); }//end tryc }else{ //沒(méi)有任務(wù),則鈍化線程至規(guī)定時(shí)間-------- synchronized(this){ try{ System.out.println("===Passivate into passivePool ... "); //看線程是否被中斷,如果被中斷停止執(zhí)行任務(wù)---- boolean isInterrupted = Thread.currentThread().isInterrupted(); if(isInterrupted){ System.out.println("===Breaking current thread and jump whlie [1] ... "); break; } // passivePool.add(this); passivePool.add(Thread.currentThread()); //準(zhǔn)備睡眠線程------- isInterrupted = Thread.currentThread().isInterrupted(); if(isInterrupted){ System.out.println("===Breaking current thread and jump whlie [2] ... "); break; } this.wait(IDLE_TIMEOUT); } catch(Throwable ex1){ System.out.println("###Current Thread passivate is failure ... break while cycle. "+ex1); break; } } } }//end while-------- if(pool.contains(passivePool)){ pool.remove(this); } if(passivePool.contains(passivePool)){ passivePool.remove(this); } System.out.println("===The thread execute over ... "); }//end run---------- } class QueueThread implements Runnable{ public QueueThread(){ } public void run(){ while(true){ //自動(dòng)裝在任務(wù)-------- queue.autoAddTask(); System.out.println("===The size of queue's task is "+queue.getTaskSize()); synchronized(this){ if(Thread.currentThread().isInterrupted()){ break; }else{ try{ this.wait(queue.getLoadDataPollingTime()); } catch(Throwable ex){ System.out.println("===QueueThread invoked wait is failure ... break while cycle."+ex); break; } }//end if }//end synchr }//end while }//end run } }
WorkQueue
=====================================
CODE:package test.thread.pool1; import java.util.LinkedList; import test.thread.pool1.impl.MyTask; /** * <p>Title: 工作隊(duì)列對(duì)象 </p> * <p>Description: </p> * <p>Copyright: Copyright (c) 2005</p> * <p>Company: </p> * @author not attributable * @version 1.0 */ public abstract class WorkQueue implements IWorkQueue{ /* 預(yù)計(jì)裝載量 */ private int load_size; /* 數(shù)據(jù)裝載輪循時(shí)間 */ private long load_polling_time; /* 隊(duì)列 */ private LinkedList queue = new LinkedList(); /** * * @param load_size 預(yù)計(jì)裝載量 * @param load_polling_time 數(shù)據(jù)裝載輪循時(shí)間 */ public WorkQueue(int load_size,long load_polling_time){ this.load_size = (load_size <= 10) ? 10 : load_size; this.load_polling_time = load_polling_time; } /* 數(shù)據(jù)裝載輪循時(shí)間 */ public long getLoadDataPollingTime(){ return this.load_polling_time; } /*獲取任務(wù),并刪除隊(duì)列中的任務(wù)*/ public synchronized ITask getTask(){ ITask task = (ITask)queue.getFirst(); queue.removeFirst(); return task; } /*加入任務(wù)*/ public void addTask(ITask task){ queue.addLast(task); } /*刪除任務(wù)*/ public synchronized void removeTask(ITask task){ queue.remove(task); } /*任務(wù)總數(shù)*/ public synchronized int getTaskSize(){ return queue.size(); } /*自動(dòng)裝填任務(wù)*/ public synchronized void autoAddTask(){ synchronized(this){ float load_size_auto = load_size - getTaskSize() / load_size; System.out.println("===load_size_auto:"+load_size_auto); if(load_size_auto > 0.25){ autoAddTask0(); } else { System.out.println("=== Not must load new work queue ... Now! "); } } } /*刪除所有任務(wù)*/ public synchronized void clearAllTask(){ queue.clear(); } /** * 程序員自己實(shí)現(xiàn)該方法 */ protected abstract void autoAddTask0(); }
MyWorkQueue
=====================================
CODE:package test.thread.pool1.impl; import java.util.LinkedList; import test.thread.pool1.WorkQueue; /** * <p>Title: 例子工作隊(duì)列對(duì)象 </p> * <p>Description: </p> * <p>Copyright: Copyright (c) 2005</p> * <p>Company: </p> * @author not attributable * @version 1.0 */ public class MyWorkQueue extends WorkQueue{ /** * @param load_size 預(yù)計(jì)裝載量 * @param load_polling_time 數(shù)據(jù)裝載輪循時(shí)間 */ public MyWorkQueue(int load_size,long load_polling_time){ super(load_size,load_polling_time); } /** * 自動(dòng)加載任務(wù) */ protected synchronized void autoAddTask0(){ //------------------- System.out.println("===MyWorkQueue ... invoked autoAddTask0() method ..."); for(int i=0; i<10; i++){ System.out.println("===add task :"+i); this.addTask(new MyTask()); } //------------------- } }
MyTask
=====================================
CODE:package test.thread.pool1.impl; import test.thread.pool1.ITask; /** * <p>Title: 工作任務(wù)接口 </p> * <p>Description: </p> * <p>Copyright: Copyright (c) 2005</p> * <p>Company: </p> * @author not attributable * @version 1.0 */ public class MyTask implements ITask { /** * 執(zhí)行的任務(wù) * @throws java.lang.Throwable */ public void executeTask() throws Throwable{ System.out.println("["+this.hashCode()+"] MyTask ... invoked executeTask() method ... "); } }
posted on 2006-08-24 16:55 Binary 閱讀(3733) 評(píng)論(2) 編輯 收藏 所屬分類(lèi): j2se