[置頂]線程池代碼
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.omg.CORBA.TIMEOUT;
import com.nio.test.ReadFileThread;
public class ThreadPoolManager {
private static ThreadPoolManager tpm = new ThreadPoolManager();
// 線程池最小線程數(shù)
private final static int MIN_SIZE = 4;
// 線程池最大線程數(shù)
private final static int MAX_SIZE = 10;
// 線程池維護線程允許的空閑限制
private final static int KEEP_ACTIVE_TIME = 0;
// 線程池用的緩沖隊列大小
private final static int WORK_QUEUE_SIZE = 10;
// 消息緩沖隊列
Queue queue = new LinkedList();
final Runnable accessBuffeThread = new Runnable()
{
public void run() {
if( hasMoreAcquire() ){
String msg = ( String ) queue.poll();
Runnable task = new AccessDBThread( msg );
threadpool.execute( task );
}
}
};
// 無法由 ThreadPoolExecutor 執(zhí)行的任務的處理程序
final RejectedExecutionHandler handler = new RejectedExecutionHandler()
{
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// TODO Auto-generated method stub
System.out.println(((AccessDBThread )r).getMsg()+"消息放入隊列中重新等待執(zhí)行");
queue.offer((( AccessDBThread ) r ).getMsg() );
}
};
final ThreadPoolExecutor threadpool = new ThreadPoolExecutor(MIN_SIZE, MAX_SIZE, KEEP_ACTIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE),this.handler);
// 調(diào)度線程池
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 );
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(accessBuffeThread, 0, 1, TimeUnit.SECONDS);
public static ThreadPoolManager newinstance()
{
return tpm;
}
private ThreadPoolManager (){}
private boolean hasMoreAcquire()
{
return !queue.isEmpty();
}
public void addLogMsg(String msg)
{
Runnable task = new AccessDBThread(msg);
threadpool.execute(task);
}
}
public class AccessDBThread implements Runnable{
private String msg;
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public AccessDBThread(){
super();
}
public AccessDBThread(String msg)
{
this.msg=msg;
}
public void run() {
// TODO Auto-generated method stub
System.out.println("Added the message: "+msg+" into the Database");
}
}
測試類:
public class TestDriver {
ThreadPoolManager tpm = ThreadPoolManager.newinstance();
public void addMsg(String msg)
{
tpm.addLogMsg(msg);
}
public static void main(String[] args) {
for(int i=0;i<100;i++)
{
new TestDriver().addMsg(Integer.toString(i));
}
}
}
posted @ 2011-09-02 15:20 crazy-李陽 閱讀(418) | 評論 (0) | 編輯 收藏