qileilove

          blog已經(jīng)轉(zhuǎn)移至github,大家請訪問 http://qaseven.github.io/

          Java多線程設(shè)計模式之線程池模式

           前序:

            Thread-Per-Message Pattern,是一種對于每個命令或請求,都分配一個線程,由這個線程執(zhí)行工作。它將“委托消息的一端”和“執(zhí)行消息的一端”用兩個不同的線程來實(shí)現(xiàn)。該線程模式主要包括三個部分:

            1、Request參與者(委托人),也就是消息發(fā)送端或者命令請求端

            2、Host參與者,接受消息的請求,負(fù)責(zé)為每個消息分配一個工作線程。

            3、Worker參與者,具體執(zhí)行Request參與者的任務(wù)的線程,由Host參與者來啟動。

            由于常規(guī)調(diào)用一個方法后,必須等待該方法完全執(zhí)行完畢后才能繼續(xù)執(zhí)行下一步操作,而利用線程后,就不必等待具體任務(wù)執(zhí)行完畢,就可以馬上返回繼續(xù)執(zhí)行下一步操作。

            背景:

            由于在Thread-Per-Message Pattern中對于每一個請求都會生成啟動一個線程,而線程的啟動是很花費(fèi)時間的工作,所以鑒于此,提出了Worker Thread,重復(fù)利用已經(jīng)啟動的線程。

            線程池:

            Worker Thread,也稱為工人線程或背景線程,不過一般都稱為線程池。該模式主要在于,事先啟動一定數(shù)目的工作線程。當(dāng)沒有請求工作的時候,所有的工人線程都會等待新的請求過來,一旦有工作到達(dá),就馬上從線程池中喚醒某個線程來執(zhí)行任務(wù),執(zhí)行完畢后繼續(xù)在線程池中等待任務(wù)池的工作請求的到達(dá)。

            任務(wù)池:主要是存儲接受請求的集合,利用它可以緩沖接受到的請求,可以設(shè)置大小來表示同時能夠接受最大請求數(shù)目。這個任務(wù)池主要是供線程池來訪問。

            線程池:這個是工作線程所在的集合,可以通過設(shè)置它的大小來提供并發(fā)處理的工作量。對于線程池的大小,可以事先生成一定數(shù)目的線程,根據(jù)實(shí)際情況來動態(tài)增加或者減少線程數(shù)目。線程池的大小不是越大越好,線程的切換也會耗時的。

            存放池的數(shù)據(jù)結(jié)構(gòu),可以用數(shù)組也可以利用集合,在集合類中一般使用Vector,這個是線程安全的。

            Worker Thread的所有參與者:

            1、Client參與者,發(fā)送Request的參與者

            2、Channel參與者,負(fù)責(zé)緩存Request的請求,初始化啟動線程,分配工作線程

            3、Worker參與者,具體執(zhí)行Request的工作線程

            4、Request參與者

            注意:將在Worker線程內(nèi)部等待任務(wù)池非空的方式稱為正向等待。

            將在Channel線程提供Worker線程來判斷任務(wù)池非空的方式稱為反向等待。

            線程池實(shí)例1:

            利用同步方法來實(shí)現(xiàn),使用數(shù)組來作為任務(wù)池的存放數(shù)據(jù)結(jié)構(gòu)。在Channel有緩存請求方法和處理請求方法,利用生成者與消費(fèi)者模式來處理存儲請求,利用反向等待來判斷任務(wù)池的非空狀態(tài)。

            Channel參與者:

          1. package whut.threadpool; 
          2. //用到了生產(chǎn)者與消費(fèi)者模式 
          3. //生成線程池,接受客戶端線程的請求,找到一個工作線程分配該客戶端請求 
          4. public class Channel { 
          5.     private static final int MAX_REQUEST = 100;// 并發(fā)數(shù)目,就是同時可以接受多少個客戶端請求 
          6.     //利用數(shù)組來存放請求,每次從數(shù)組末尾添加請求,從開頭移除請求來處理 
          7.     private final Request[] requestQueue;// 存儲接受客戶線程的數(shù)目 
          8.     private int tail;//下一次存放Request的位置 
          9.     private int head;//下一次獲取Request的位置 
          10.     private int count;// 當(dāng)前request數(shù)量 
          11.     private final WorkerThread[] threadPool;// 存儲線程池中的工作線程 
          12.     // 運(yùn)用數(shù)組來存儲 
          13.     public Channel(int threads) { 
          14.         this.requestQueue = new Request[MAX_REQUEST]; 
          15.         this.head = 0
          16.         this.head = 0
          17.         this.count = 0
          18.         threadPool = new WorkerThread[threads]; 
          19.         // 啟動工作線程 
          20.         for (int i = 0; i < threadPool.length; i++) { 
          21.             threadPool[i] = new WorkerThread("Worker-" + i, this); 
          22.         } 
          23.     } 
          24.     public void startWorkers() { 
          25.         for (int i = 0; i < threadPool.length; i++) { 
          26.             threadPool[i].start(); 
          27.         } 
          28.     } 
          29.     // 接受客戶端請求線程 
          30.     public synchronized void putRequest(Request request) { 
          31.         // 當(dāng)Request的數(shù)量大于或等于同時接受的數(shù)目時候,要等待 
          32.         while (count >= requestQueue.length) 
          33.             try { 
          34.                 wait(); 
          35.             } catch (InterruptedException e) { 
          36.             } 
          37.         requestQueue[tail] = request; 
          38.         tail = (tail + 1) % requestQueue.length; 
          39.         count++; 
          40.         notifyAll(); 
          41.     } 
          42.     // 處理客戶端請求線程 
          43.     public synchronized Request takeRequest() { 
          44.         while (count <= 0
          45.             try { 
          46.                 wait(); 
          47.             } catch (InterruptedException e) { 
          48.             } 
          49.         Request request = requestQueue[head]; 
          50.         head = (head + 1) % requestQueue.length; 
          51.         count--; 
          52.         notifyAll(); 
          53.         return request; 
          54.     } 
          55. }


           客戶端請求線程:

          1. package whut.threadpool; 
          2. import java.util.Random; 
          3. //向Channel發(fā)送Request請求的 
          4. public class ClientThread extends Thread{ 
          5.     private final Channel channel; 
          6.     private static final Random random=new Random(); 
          7.                                                                 
          8.     public ClientThread(String name,Channel channel) 
          9.     { 
          10.         super(name); 
          11.         this.channel=channel; 
          12.     } 
          13.     public void run() 
          14.     { 
          15.         try
          16.             for(int i=0;true;i++) 
          17.             { 
          18.                 Request request=new Request(getName(),i); 
          19.                 channel.putRequest(request); 
          20.                 Thread.sleep(random.nextInt(1000)); 
          21.             } 
          22.         }catch(InterruptedException e) 
          23.         { 
          24.         } 
          25.     } 
          26. }

            工作線程:

          1. package whut.threadpool; 
          2. //具體工作線程 
          3. public class WorkerThread extends Thread{ 
          4.                                                        
          5.     private final Channel channel; 
          6.     public WorkerThread(String name,Channel channel) 
          7.     { 
          8.       super(name); 
          9.       this.channel=channel; 
          10.     } 
          11.                                                        
          12.     public void run() 
          13.     { 
          14.         while(true
          15.         { 
          16.             Request request=channel.takeRequest(); 
          17.             request.execute(); 
          18.         } 
          19.     } 
          20. }

            線程池實(shí)例2:

            工作線程:

            利用同步塊來處理,利用Vector來存儲客戶端請求。在Channel有緩存請求方法和處理請求方法,利用生成者與消費(fèi)者模式來處理存儲請求,利用正向等待來判斷任務(wù)池的非空狀態(tài)。

            這種實(shí)例,可以借鑒到網(wǎng)絡(luò)ServerSocket處理用戶請求的模式中,有很好的擴(kuò)展性與實(shí)用性。

            利用Vector來存儲,依舊是每次集合的最后一個位置添加請求,從開始位置移除請求來處理。

           Channel參與者:

          1. package whut.threadpool2; 
          2. import java.util.Vector; 
          3. /* 
          4.  * 這個主要的作用如下 
          5.  * 0,緩沖客戶請求線程(利用生產(chǎn)者與消費(fèi)者模式) 
          6.  * 1,存儲客戶端請求的線程 
          7.  * 2,初始化啟動一定數(shù)量的線程 
          8.  * 3,主動來喚醒處于任務(wù)池中wait set的一些線程來執(zhí)行任務(wù) 
          9.  */ 
          10. public class Channel { 
          11.     public final static int THREAD_COUNT=4
          12.     public static void main(String[] args) { 
          13.       //定義兩個集合,一個是存放客戶端請求的,利用Vector, 
          14.       //一個是存儲線程的,就是線程池中的線程數(shù)目 
          15.                               
          16.       //Vector是線程安全的,它實(shí)現(xiàn)了Collection和List 
          17.       //Vector 類可以實(shí)現(xiàn)可增長的對象數(shù)組。與數(shù)組一樣, 
          18.       //它包含可以使用整數(shù)索引進(jìn)行訪問的組件。但Vector 的大小可以根據(jù)需要增大或縮小, 
          19.       //以適應(yīng)創(chuàng)建 Vector 后進(jìn)行添加或移除項(xiàng)的操作。 
          20.       //Collection中主要包括了list相關(guān)的集合以及set相關(guān)的集合,Queue相關(guān)的集合 
          21.       //注意:Map不是Collection的子類,都是java.util.*下的同級包 
          22.       Vector pool=new Vector(); 
          23.       //工作線程,初始分配一定限額的數(shù)目 
          24.       WorkerThread[] workers=new WorkerThread[THREAD_COUNT]; 
          25.                            
          26.       //初始化啟動工作線程 
          27.       for(int i=0;i<workers.length;i++) 
          28.       { 
          29.           workers[i]=new WorkerThread(pool); 
          30.           workers[i].start(); 
          31.       } 
          32.                             
          33.       //接受新的任務(wù),并且將其存儲在Vector中 
          34.       Object task=new Object();//模擬的任務(wù)實(shí)體類 
          35.       //此處省略具體工作 
          36.       //在網(wǎng)絡(luò)編程中,這里就是利用ServerSocket來利用ServerSocket.accept接受一個Socket從而喚醒線程 
          37.                             
          38.       //當(dāng)有具體的請求達(dá)到 
          39.       synchronized(pool) 
          40.       { 
          41.           pool.add(pool.size(), task); 
          42.           pool.notifyAll();//通知所有在pool wait set中等待的線程,喚醒一個線程進(jìn)行處理 
          43.       } 
          44.       //注意上面這步驟添加任務(wù)池請求,以及通知線程,都可以放在工作線程內(nèi)部實(shí)現(xiàn) 
          45.       //只需要定義該方法為static,在方法體用同步塊,且共享的線程池也是static即可 
          46.                             
          47.       //下面這步,可以有可以沒有根據(jù)實(shí)際情況 
          48.       //取消等待的線程 
          49.       for(int i=0;i<workers.length;i++) 
          50.       { 
          51.           workers[i].interrupt(); 
          52.       } 
          53.     } 
          54. }

            工作線程:

          1. package whut.threadpool2; 
          2. import java.util.List; 
          3. public class WorkerThread extends Thread { 
          4.     private List pool;//任務(wù)請求池 
          5.     private static int fileCompressed=0;//所有實(shí)例共享的 
          6.                       
          7.     public WorkerThread(List pool) 
          8.     { 
          9.           this.pool=pool;  
          10.     } 
          11.                       
          12.     //利用靜態(tài)synchronized來作為整個synchronized類方法,僅能同時一個操作該類的這個方法 
          13.     private static synchronized void incrementFilesCompressed() 
          14.     { 
          15.         fileCompressed++; 
          16.     } 
          17.                       
          18.     public void run() 
          19.     { 
          20.         //保證無限循環(huán)等待中 
          21.         while(true
          22.         { 
          23.             //共享互斥來訪問pool變量 
          24.             synchronized(pool) 
          25.             { 
          26.                 //利用多線程設(shè)計模式中的 
          27.                 //Guarded Suspension Pattern,警戒條件為pool不為空,否則無限的等待中 
          28.                 while(pool.isEmpty()) 
          29.                 { 
          30.                     try
          31.                         pool.wait();//進(jìn)入pool的wait set中等待著,釋放了pool的鎖 
          32.                     }catch(InterruptedException e) 
          33.                     { 
          34.                     } 
          35.                 } 
          36.                 //當(dāng)線程被喚醒,需要重新獲取pool的鎖, 
          37.                 //再次繼續(xù)執(zhí)行synchronized代碼塊中其余的工作 
          38.                 //當(dāng)不為空的時候,繼續(xù)再判斷是否為空,如果不為空,則跳出循環(huán) 
          39.                 //必須先從任務(wù)池中移除一個任務(wù)來執(zhí)行,統(tǒng)一用從末尾添加,從開始處移除 
          40.                                   
          41.                 pool.remove(0);//獲取任務(wù)池中的任務(wù),并且要進(jìn)行轉(zhuǎn)換 
          42.             } 
          43.             //下面是線程所要處理的具體工作 
          44.         } 
          45.     } 
          46. }

          posted on 2013-05-29 10:23 順其自然EVO 閱讀(275) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          <2013年5月>
          2829301234
          567891011
          12131415161718
          19202122232425
          2627282930311
          2345678

          導(dǎo)航

          統(tǒng)計

          常用鏈接

          留言簿(55)

          隨筆分類

          隨筆檔案

          文章分類

          文章檔案

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 台北县| 资源县| 大兴区| 东兰县| 新乐市| 洪洞县| 芦溪县| 锦州市| 华宁县| 武冈市| 绥芬河市| 商都县| 灵丘县| 通城县| 永昌县| 水富县| 白河县| 东城区| 白山市| 偏关县| 吕梁市| 敦化市| 宁海县| 东海县| 康保县| 正蓝旗| 策勒县| 南部县| 临泽县| 来宾市| 宣城市| 龙泉市| 花莲市| 巴中市| 台州市| 华容县| 兰坪| 石门县| 两当县| 张家港市| 遂昌县|