java并發學習之二:線程池

          Posted on 2011-03-23 20:41 snake 閱讀(170) 評論(0)  編輯  收藏
          第二步,是實現一個線程池

          因為之前看書的時候留了個心眼,看線程池相關的內容的時候特意沒去研究JDK的實現
          因為學跟做不是一碼事,寫一個線程池,算是給自己看完并發實踐這書的一個練習吧

          廢話不多說,練習開始

          首先,整理一下要實現的功能,或者說要注意的元素
          1.實現Executor接口
          2.實現一個等待隊列(可否配置,優先級等)
          3.是否需要預啟動線程(可否配置)
          4.執行開始前,結束后,需要留接口
          5.Runable在任務中的存放形式
          6.線程的啟動,喚醒
          7.線程池的關閉(優雅地結束),需要線程提供中斷,線程池提供給使用者的提示,線程池返回取消線程等
          8.線程隊列(空閑隊列?)
          9.取空閑線程算法(任務入隊時?線程執行一個任務結束時?)
          10.將所有需要同步的地方盡量使用非阻塞算法(通過偵察,更新一個原子變量實現)
          11.減少線程切換開銷(輪詢是否有任務,n微秒后再進入等待)

          暫時就考慮到這些,剩下的以后再補

          總的來說,計劃寫n個版本(畢竟是第二次動手,寫一個龐大的需要細致考慮的東西功力還差遠呢,只能從最簡單的,最方便的實現開始,然后慢慢加強)

          測試先行:
          	public static void main(String[] args) throws InterruptedException
          	{
          		testEasyRunnableThreadPool(new ThreadPoolTest1(10), 10000, 10);
          		testEasyRunnableThreadPool(Executors.newFixedThreadPool(10), 10000, 10);
          	}
          
          	/**
          	 * 一個產生隨機數的方法,防止jvm優化
          	 * @param seed
          	 * @return
          	 */
          	static int getRandomNum(int seed)
          	{
          		seed ^= (seed << 6);
          		seed ^= (seed >>> 21);
          		seed ^= (seed << 7);
          		return seed;
          	}
          	
          	/**
          	 * 執行一個簡單的計算,只占用cpu,沒有io和其他阻塞的方法
          	 * @param pool
          	 * @param tryTime
          	 * @param threadNum
          	 * @throws InterruptedException
          	 */
          	static void testEasyRunnableThreadPool(Executor pool,int tryTime,int threadNum) throws InterruptedException
          	{
          		//construct runnable
          		Runnable command = new Runnable() {			
          			public void run() {
          				final int addTime = 1000000;
          				long sum = 0;
          				int temp = this.hashCode() ^ (int)System.currentTimeMillis();
          				for(int i = 0;i<addTime;i++)
          				{
          					sum += (temp = getRandomNum(temp));
          				}				
          			}
          		};		
          		testThreadPool(tryTime, pool, command);
          	}
          	
          	/**
          	 * 
          	 * @param tryNum 
          	 * @param pool
          	 * @param commandList
          	 * @throws InterruptedException
          	 */
          	static void testThreadPool(int tryNum,Executor pool,final Runnable command) throws InterruptedException
          	{
          		final CountDownLatch latch = new CountDownLatch(tryNum);
          		Runnable wrapper = new Runnable() {			
          			public void run() {
          				command.run();
          				//想測試并發,在并發中加入適當的同步操作是無法避免的,只能減少
          				//,在這,只是做了一個簡單的countdown,影響不大
          				latch.countDown();
          			}
          		};
          		long startTime = System.nanoTime();
          		for(int i = 0;i<tryNum;i++)
          		{
          			pool.execute(wrapper);
          		}		
          		latch.await();
          		long endTime = System.nanoTime();
          		System.out.println(endTime-startTime);
          	}
          
          



          線程池代碼:
          第一版本的目標很簡單,只要能跑,沒死鎖,就是完勝
          可惜結果很讓人絕望~
          寫完了,調了近3個小時,仍然沒發現問題,最后加了一堆輸出,又加了多個鎖,終于勉勉強強跑起來了……
          并發的調試真難,debug完全沒用,看輸出又看不出什么來,只能是一遍一遍地檢查代碼,寫下一個版本前先找點資料,研究下調試方法吧
          后來發現錯誤是一個簡單的i++……
          public class ThreadPoolTest1 implements Executor {
          	//等待隊列
          	Queue<Runnable> waitingQueue = null;
          	
          	ConcurrentLinkedQueue<ThreadNode> freeThread;
          	//相當于一個freeThread的狀態,根據狀態決定行為,原則上將freeThread.size()+busyThreadsNum=MAXTHREADNUM
          	private AtomicInteger busyThreadsNum = new AtomicInteger(0);
          	//最大線程數
          	final int MAXTHREADNUM;
          	
          	public ThreadPoolTest1 (int threadNum)
          	{
          		this.MAXTHREADNUM = threadNum;
          		init(MAXTHREADNUM,new ConcurrentLinkedQueue<Runnable>());
          	}	
          
          	private void init(int threadNum,ConcurrentLinkedQueue<Runnable> queue)
          	{
          		freeThread = new ConcurrentLinkedQueue<ThreadNode>();
          		waitingQueue = queue;
          		//初始化空線程,一開始不是這樣實現的,后來發現一堆問題,暫時只能先加鎖,無論怎么樣,跑起來再說把
          		synchronized(this)
          		{
          			for(int i = 0;i<threadNum;i++)
          			{
          				ThreadNode node = new ThreadNode();
          				busyThreadsNum.incrementAndGet();				
          				node.start();
          			}
          		}
          	}
          	
          	private synchronized void threadExecute(Runnable command)
          	{
          		//用了個挺弱智的非阻塞算法
          		for(;;)
          		{
          			//得到開始的值
          			int expect = busyThreadsNum.get();
          			if(expect == MAXTHREADNUM)
          			{
          				waitingQueue.add(command);
          				return;
          			}
          			else
          			{
          				//比較并設置,如果失敗,重來
          				if(busyThreadsNum.compareAndSet(expect, ++expect))//之前寫的是expect++,檢查了n久,硬是看不出啥問題,只能怪自己天資愚魯吧
          				{
          					ThreadNode t = freeThread.remove();
          					t.setCommand(command);
          					synchronized(t)
          					{t.notify();}
          					return;
          				}
          				else
          					continue;
          			}
          		}
          
          	}
          	
          	private class ThreadNode extends Thread
          	{
          		Runnable command = null;
          		Exception e = null;
          		
          		Exception getException()
          		{
          			return e;
          		}
          		
          		void setCommand(Runnable c)
          		{
          			command = c;
          		}
          		
          		
          		@Override
          		public void run() {
          			try {
          				for(;;)
          				{
          					if(command == null)
          					{					
          						ThreadPoolTest1.this.waitThread(this);					
          					}
          					command.run();				
          					command = ThreadPoolTest1.this.getCommand();			
          				}
          			}catch (InterruptedException e) {
          			}
          		}
          	}
          	
          	Runnable getCommand() throws InterruptedException
          	{
          		return waitingQueue.poll();
          	}
          	
          	void waitThread(Thread t) throws InterruptedException
          	{
          		synchronized(this)
          		{		
          			freeThread.add((ThreadNode) t);
          			busyThreadsNum.decrementAndGet();
          		}
          		synchronized(t)
          		{
          			t.wait();					
          		}
          
          	}
          	
          	protected void beforeExecute()
          	{
          		
          	}
          
          	public void execute(Runnable command) {
          		beforeExecute();		
          		threadExecute(command);
          		afterExecute();
          
          	}
          	
          	protected void afterExecute()
          	{
          		
          	}
          }
          


          第一版本就寫成這樣了,之后再慢慢加強了,畢竟學java第一個玩意兒也是helloworld,原諒自己了

          已有 0 人發表留言,猛擊->>這里<<-參與討論


          ItEye推薦




          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 玉树县| 怀化市| 浦北县| 吴桥县| 慈溪市| 疏附县| 兰溪市| 竹山县| 石楼县| 惠水县| 肃宁县| 克什克腾旗| 永昌县| 雷州市| 皮山县| 台东市| 扎赉特旗| 东宁县| 娄烦县| 盖州市| 罗平县| 梅州市| 卢湾区| 罗城| 康乐县| 社旗县| 曲沃县| 五峰| 金堂县| 木兰县| 盈江县| 沈阳市| 夹江县| 武定县| 醴陵市| 泸溪县| 彰化县| 河间市| 洪泽县| 原平市| 交城县|