第二步,是實現一個線程池
因為之前看書的時候留了個心眼,看線程池相關的內容的時候特意沒去研究JDK的實現
因為學跟做不是一碼事,寫一個線程池,算是給自己看完并發實踐這書的一個練習吧
廢話不多說,練習開始
首先,整理一下要實現的功能,或者說要注意的元素
1.實現Executor接口
2.實現一個等待隊列(可否配置,優先級等)
3.是否需要預啟動線程(可否配置)
4.執行開始前,結束后,需要留接口
5.Runable在任務中的存放形式
6.線程的啟動,喚醒
7.線程池的關閉(優雅地結束),需要線程提供中斷,線程池提供給使用者的提示,線程池返回取消線程等
8.線程隊列(空閑隊列?)
9.取空閑線程算法(任務入隊時?線程執行一個任務結束時?)
10.將所有需要同步的地方盡量使用非阻塞算法(通過偵察,更新一個原子變量實現)
11.減少線程切換開銷(輪詢是否有任務,n微秒后再進入等待)
暫時就考慮到這些,剩下的以后再補
總的來說,計劃寫n個版本(畢竟是第二次動手,寫一個龐大的需要細致考慮的東西功力還差遠呢,只能從最簡單的,最方便的實現開始,然后慢慢加強)
測試先行:
線程池代碼:
第一版本的目標很簡單,只要能跑,沒死鎖,就是完勝
可惜結果很讓人絕望~
寫完了,調了近3個小時,仍然沒發現問題,最后加了一堆輸出,又加了多個鎖,終于勉勉強強跑起來了……
并發的調試真難,debug完全沒用,看輸出又看不出什么來,只能是一遍一遍地檢查代碼,寫下一個版本前先找點資料,研究下調試方法吧
后來發現錯誤是一個簡單的i++……
第一版本就寫成這樣了,之后再慢慢加強了,畢竟學java第一個玩意兒也是helloworld,原諒自己了
已有 0 人發表留言,猛擊->>這里<<-參與討論
ItEye推薦
因為之前看書的時候留了個心眼,看線程池相關的內容的時候特意沒去研究JDK的實現
因為學跟做不是一碼事,寫一個線程池,算是給自己看完并發實踐這書的一個練習吧
廢話不多說,練習開始
首先,整理一下要實現的功能,或者說要注意的元素
1.實現Executor接口
2.實現一個等待隊列(可否配置,優先級等)
3.是否需要預啟動線程(可否配置)
4.執行開始前,結束后,需要留接口
5.Runable在任務中的存放形式
6.線程的啟動,喚醒
7.線程池的關閉(優雅地結束),需要線程提供中斷,線程池提供給使用者的提示,線程池返回取消線程等
8.線程隊列(空閑隊列?)
9.取空閑線程算法(任務入隊時?線程執行一個任務結束時?)
10.將所有需要同步的地方盡量使用非阻塞算法(通過偵察,更新一個原子變量實現)
11.減少線程切換開銷(輪詢是否有任務,n微秒后再進入等待)
暫時就考慮到這些,剩下的以后再補
總的來說,計劃寫n個版本(畢竟是第二次動手,寫一個龐大的需要細致考慮的東西功力還差遠呢,只能從最簡單的,最方便的實現開始,然后慢慢加強)
測試先行:
public static void main(String[] args) throws InterruptedException { testEasyRunnableThreadPool(new ThreadPoolTest1(10), 10000, 10); testEasyRunnableThreadPool(Executors.newFixedThreadPool(10), 10000, 10); } /** * 一個產生隨機數的方法,防止jvm優化 * @param seed * @return */ static int getRandomNum(int seed) { seed ^= (seed << 6); seed ^= (seed >>> 21); seed ^= (seed << 7); return seed; } /** * 執行一個簡單的計算,只占用cpu,沒有io和其他阻塞的方法 * @param pool * @param tryTime * @param threadNum * @throws InterruptedException */ static void testEasyRunnableThreadPool(Executor pool,int tryTime,int threadNum) throws InterruptedException { //construct runnable Runnable command = new Runnable() { public void run() { final int addTime = 1000000; long sum = 0; int temp = this.hashCode() ^ (int)System.currentTimeMillis(); for(int i = 0;i<addTime;i++) { sum += (temp = getRandomNum(temp)); } } }; testThreadPool(tryTime, pool, command); } /** * * @param tryNum * @param pool * @param commandList * @throws InterruptedException */ static void testThreadPool(int tryNum,Executor pool,final Runnable command) throws InterruptedException { final CountDownLatch latch = new CountDownLatch(tryNum); Runnable wrapper = new Runnable() { public void run() { command.run(); //想測試并發,在并發中加入適當的同步操作是無法避免的,只能減少 //,在這,只是做了一個簡單的countdown,影響不大 latch.countDown(); } }; long startTime = System.nanoTime(); for(int i = 0;i<tryNum;i++) { pool.execute(wrapper); } latch.await(); long endTime = System.nanoTime(); System.out.println(endTime-startTime); }
線程池代碼:
第一版本的目標很簡單,只要能跑,沒死鎖,就是完勝
可惜結果很讓人絕望~
寫完了,調了近3個小時,仍然沒發現問題,最后加了一堆輸出,又加了多個鎖,終于勉勉強強跑起來了……
并發的調試真難,debug完全沒用,看輸出又看不出什么來,只能是一遍一遍地檢查代碼,寫下一個版本前先找點資料,研究下調試方法吧
后來發現錯誤是一個簡單的i++……
public class ThreadPoolTest1 implements Executor { //等待隊列 Queue<Runnable> waitingQueue = null; ConcurrentLinkedQueue<ThreadNode> freeThread; //相當于一個freeThread的狀態,根據狀態決定行為,原則上將freeThread.size()+busyThreadsNum=MAXTHREADNUM private AtomicInteger busyThreadsNum = new AtomicInteger(0); //最大線程數 final int MAXTHREADNUM; public ThreadPoolTest1 (int threadNum) { this.MAXTHREADNUM = threadNum; init(MAXTHREADNUM,new ConcurrentLinkedQueue<Runnable>()); } private void init(int threadNum,ConcurrentLinkedQueue<Runnable> queue) { freeThread = new ConcurrentLinkedQueue<ThreadNode>(); waitingQueue = queue; //初始化空線程,一開始不是這樣實現的,后來發現一堆問題,暫時只能先加鎖,無論怎么樣,跑起來再說把 synchronized(this) { for(int i = 0;i<threadNum;i++) { ThreadNode node = new ThreadNode(); busyThreadsNum.incrementAndGet(); node.start(); } } } private synchronized void threadExecute(Runnable command) { //用了個挺弱智的非阻塞算法 for(;;) { //得到開始的值 int expect = busyThreadsNum.get(); if(expect == MAXTHREADNUM) { waitingQueue.add(command); return; } else { //比較并設置,如果失敗,重來 if(busyThreadsNum.compareAndSet(expect, ++expect))//之前寫的是expect++,檢查了n久,硬是看不出啥問題,只能怪自己天資愚魯吧 { ThreadNode t = freeThread.remove(); t.setCommand(command); synchronized(t) {t.notify();} return; } else continue; } } } private class ThreadNode extends Thread { Runnable command = null; Exception e = null; Exception getException() { return e; } void setCommand(Runnable c) { command = c; } @Override public void run() { try { for(;;) { if(command == null) { ThreadPoolTest1.this.waitThread(this); } command.run(); command = ThreadPoolTest1.this.getCommand(); } }catch (InterruptedException e) { } } } Runnable getCommand() throws InterruptedException { return waitingQueue.poll(); } void waitThread(Thread t) throws InterruptedException { synchronized(this) { freeThread.add((ThreadNode) t); busyThreadsNum.decrementAndGet(); } synchronized(t) { t.wait(); } } protected void beforeExecute() { } public void execute(Runnable command) { beforeExecute(); threadExecute(command); afterExecute(); } protected void afterExecute() { } }
第一版本就寫成這樣了,之后再慢慢加強了,畢竟學java第一個玩意兒也是helloworld,原諒自己了
已有 0 人發表留言,猛擊->>這里<<-參與討論
ItEye推薦