JAVA學習點點滴滴

          用開放的腦子去闖蕩;用開闊的視野去拼搏;用平和的身心去磨練;用美好的理想去追求!

            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            18 隨筆 :: 1 文章 :: 2 評論 :: 0 Trackbacks

          李濤,Sun中國工程研究院工程師

          概述

          1:三個新加的多線程包

          2Callable Future接口

          3:新的線程執行架構

          4LockersCondition接口

          5: Synchronizer:同步裝置

          6: BlockingQueue接口

          7Atomics 原子級變量

          8Concurrent Collections 共點聚集

          概述

          Java 1995 年面世以來得到了廣泛得一個運用,但是對多線程編程的支持 Java 很長時間一直停留在初級階段。在 Java 5.0 之前 Java 里的多線程編程主要是通過 Thread 類, Runnable 接口, Object 對象中的 wait() 、 notify() 、 notifyAll() 等方法和 synchronized 關鍵詞來實現的。這些工具雖然能在大多數情況下解決對共享資源的管理和線程間的調度,但存在以下幾個問題

          1.????? 過于原始,拿來就能用的功能有限,即使是要實現簡單的多線程功能也需要編寫大量的代碼。這些工具就像匯編語言一樣難以學習和使用,比這更糟糕的是稍有不慎它們還可能被錯誤地使用,而且這樣的錯誤很難被發現。

          2.????? 如果使用不當,會使程序的運行效率大大降低。

          3.????? 為了提高開發效率,簡化編程,開發人員在做項目的時候往往需要寫一些共享的工具來實現一些普遍適用的功能。但因為沒有規范,相同的工具會被重復地開發,造成資源浪費。

          4.????? 因為鎖定的功能是通過 Synchronized 來實現的,這是一種塊結構,只能對代碼中的一段代碼進行鎖定,而且鎖定是單一的。如以下代碼所示:

          synchronized lock {

          ? ?? // 執行對共享資源的操作

          ??? ……

          }

          ? 一些復雜的功能就很難被實現。比如說如果程序需要取得 lock A lock B 來進行操作 1 ,然后需要取得 lock C 并且釋放 lock A 來進行操作 2 , Java 5.0 之前的多線程框架就顯得無能為力了。

          因為這些問題,程序員對舊的框架一直頗有微詞。這種情況一直到 Java 5.0 才有較大的改觀,一系列的多線程工具包被納入了標準庫文件。這些工具包括了一個新的多線程程序的執行框架,使編程人員可方便地協調和調度線程的運行,并且新加入了一些高性能的常用的工具,使程序更容易編寫,運行效率更高。本文將分類并結合例子來介紹這些新加的多線程工具。

          在我們開始介紹 Java 5.0 里的新 Concurrent 工具前讓我們先來看一下一個用舊的多線程工具編寫的程序,這個程序里有一個 Server 線程,它需要啟動兩個 Component , Server 線程需等到 Component 線程完畢后再繼續。相同的功能在 Synchronizer 一章里用新加的工具 CountDownLatch 有相同的實現。兩個程序,孰優孰劣,哪個程序更容易編寫,哪個程序更容易理解,相信大家看過之后不難得出結論。

          public class ServerThread {

          ????? Object concLock = new Object();

          ????? int count = 2;

          public void runTwoThreads() {

          ????? // 啟動兩個線程去初始化組件

          ??????????? new Thread(new ComponentThread1(this)).start();

          ??????????? new Thread(new ComponentThread1(this)).start();

          ??????????? // Wait for other thread

          while(count != 0) {

          ????????????????? synchronized(concLock) {

          ??????????????????????? try {

          ????????????????????????????? concLock.wait();

          ????????????????????????????? System.out.println("Wake up.");

          ??????????????????????? } catch (InterruptedException ie) { // 處理異常 }

          ????????????????? }

          ??????????? }

          ??????????? System.out.println("Server is up.");

          ????? }

          ????? public void callBack() {

          synchronized(concLock) {

          ????????????????? count--;

          ????????????????? concLock.notifyAll();

          ??????????? }

          ????? }

          ????? public static void main(String[] args){

          ??????????? ServerThread server = new ServerThread();

          ??????????? server.runTwoThreads();

          ????? }

          }

          ?

          public class ComponentThread1 implements Runnable {

          ????? private ServerThread server;

          ????? public ComponentThread1(ServerThread server) {

          ??????????? this.server = server;

          ????? }

          public void run() {

          ????? // 做組件初始化的工作

          ??????????? System.out.println("Do component initialization.");

          ??????????? server.callBack();

          ????? }

          }

          1:三個新加的多線程包

          Java 5.0 里新加入了三個多線程包: java.util.concurrent, java.util.concurrent.atomic, java.util.concurrent.locks.

          • java.util.concurrent 包含了常用的多線程工具,是新的多線程工具的主體。
          • java.util.concurrent.atomic 包含了不用加鎖情況下就能改變值的原子變量,比如說 AtomicInteger 提供了 addAndGet() 方法。 Add Get 是兩個不同的操作,為了保證別的線程不干擾,以往的做法是先鎖定共享的變量,然后在鎖定的范圍內進行兩步操作。但用 AtomicInteger.addAndGet() 就不用擔心鎖定的事了,其內部實現保證了這兩步操作是在原子量級發生的,不會被別的線程干擾。
          • java.util.concurrent.locks 包包含鎖定的工具。

          2Callable Future接口

          Callable 是類似于 Runnable 的接口,實現 Callable 接口的類和實現 Runnable 的類都是可被其它線程執行的任務。 Callable Runnable 有幾點不同:

          • Callable 規定的方法是 call() ,而 Runnable 規定的方法是 run().
          • Callable 的任務執行后可返回值,而 Runnable 的任務是不能返回值的。
          • call ()方法可拋出異常,而 run ()方法是不能拋出異常的。
          • 運行 Callable 任務可拿到一個 Future 對象,通過 Future 對象可了解任務執行情況,可取消任務的執行,還可獲取任務執行的結果。

          以下是 Callable 的一個例子:

          public class DoCallStuff implements Callable<String>{ // *1

          ??????? private int aInt;

          ??????? public DoCallStuff(int aInt) {

          ??????????????? this.aInt = aInt;

          ??????? }

          ??????? public String call() throws Exception { //*2

          ??????????????? boolean resultOk = false;

          ??????????????? if(aInt == 0){

          ??????????????????????? resultOk = true;

          ??????????????? }? else if(aInt == 1){

          ??????????????????????? while(true){ //infinite loop

          ?????????????????? ?????????????System.out.println("looping....");

          ??????????????????????????????? Thread.sleep(3000);

          ??????????????????????? }

          ??????????????? } else {

          ??????????????????????? throw new Exception("Callable terminated with Exception!"); //*3

          ??????????????? }

          ??????????????? if(resultOk){

          ??????????????????????? return "Task done.";

          ??????????????? } else {

          ??????????????????????? return "Task failed";

          ??????????????? }

          ??????? }

          }

          *1: 名為 DoCallStuff 類實現了 Callable<String> , String 將是 call 方法的返回值類型。例子中用了 String ,但可以是任何 Java 類。

          *2: call 方法的返回值類型為 String ,這是和類的定義相對應的。并且可以拋出異常。

          *3: call 方法可以拋出異常,如加重的斜體字所示。

          以下是調用 DoCallStuff 的主程序。

          import java.util.concurrent.ExecutionException;

          import java.util.concurrent.ExecutorService;

          import java.util.concurrent.Executors;

          import java.util.concurrent.Future;

          public class Executor {

          ??????? public static void main(String[] args){

          ??????????????? //*1

          ??????????????? DoCallStuff call1 = new DoCallStuff(0);

          ??????????????? DoCallStuff call2 = new DoCallStuff(1);

          ??????????????? DoCallStuff call3 = new DoCallStuff(2);

          ??????????????? //*2

          ??????????????? ExecutorService es = Executors.newFixedThreadPool(3);

          ??????????????? //*3

          ??????????????? Future<String> future1 = es.submit(call1);

          ??????????????? Future<String> future2 = es.submit(call2);

          ???? ???????????Future<String> future3 = es.submit(call3);

          ??????????????? try {

          ??????????????????????? //*4

          ??????????????????????? System.out.println(future1.get());

          ???????????????????????? //*5

          ??????????????????????? Thread.sleep(3000);

          ????????????????? ??????System.out.println("Thread 2 terminated? :" + future2.cancel(true));

          ??????????????????????? //*6

          ??????????????????????? System.out.println(future3.get());

          ??????????????? } catch (ExecutionException ex) {

          ??????????????????????? ex.printStackTrace();

          ??????????????? } catch (InterruptedException ex) {

          ??????????????????????? ex.printStackTrace();

          ??????????????? }

          ??????? }

          }

          *1: 定義了幾個任務

          *2: 初始了任務執行工具。任務的執行框架將會在后面解釋。

          *3: 執行任務,任務啟動時返回了一個 Future 對象,如果想得到任務執行的結果或者是異常可對這個 Future 對象進行操作。 Future 所含的值必須跟 Callable 所含的值對映,比如說例子中 Future<String> 對印 Callable<String>

          *4: 任務 1 正常執行完畢, future1.get() 會返回線程的值

          *5: 任務 2 在進行一個死循環,調用 future2.cancel(true) 來中止此線程。傳入的參數標明是否可打斷線程, true 表明可以打斷。

          *6: 任務 3 拋出異常,調用 future3.get() 時會引起異常的拋出。

          ? 運行 Executor 會有以下運行結果:

          looping....

          Task done. //*1

          looping....

          looping....//*2

          looping....

          looping....

          looping....

          looping....

          Thread 2 terminated? :true //*3

          //*4

          java.util.concurrent.ExecutionException: java.lang.Exception: Callable terminated with Exception!

          ??????? at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:205)

          ??????? at java.util.concurrent.FutureTask.get(FutureTask.java:80)

          ??????? at concurrent.Executor.main(Executor.java:43)

          ??????? …….

          *1: 任務 1 正常結束

          *2: 任務 2 是個死循環,這是它的打印結果

          *3: 指示任務 2 被取消

          *4: 在執行 future3.get() 時得到任務 3 拋出的異常

          3:新的任務執行架構

          Java 5.0 之前啟動一個任務是通過調用 Thread 類的 start() 方法來實現的,任務的提于交和執行是同時進行的,如果你想對任務的執行進行調度或是控制同時執行的線程數量就需要額外編寫代碼來完成。 5.0 里提供了一個新的任務執行架構使你可以輕松地調度和控制任務的執行,并且可以建立一個類似數據庫連接池的線程池來執行任務。這個架構主要有三個接口和其相應的具體類組成。這三個接口是 Executor, ExecutorService ScheduledExecutorService ,讓我們先用一個圖來顯示它們的關系:

          ?

          圖的左側是接口,圖的右側是這些接口的具體類。注意 Executor 是沒有直接具體實現的。

          Executor 接口:

          是用來執行 Runnable 任務的,它只定義一個方法:

          • execute(Runnable command) :執行 Ruannable 類型的任務

          ExecutorService 接口:

          ExecutorService 繼承了 Executor 的方法,并提供了執行 Callable 任務和中止任務執行的服務,其定義的方法主要有:

          • submit(task) :可用來提交 Callable Runnable 任務,并返回代表此任務的 Future 對象
          • invokeAll(collection of tasks) :批處理任務集合,并返回一個代表這些任務的 Future 對象集合
          • shutdown() :在完成已提交的任務后關閉服務,不再接受新任務
          • shutdownNow() :停止所有正在執行的任務并關閉服務。
          • isTerminated() :測試是否所有任務都執行完畢了。
          • isShutdown() :測試是否該 ExecutorService 已被關閉

          ScheduledExecutorService 接口

          ExecutorService 的基礎上, ScheduledExecutorService 提供了按時間安排執行任務的功能,它提供的方法主要有:

          • schedule(task, initDelay): 安排所提交的 Callable Runnable 任務在 initDelay 指定的時間后執行。
          • scheduleAtFixedRate() :安排所提交的 Runnable 任務按指定的間隔重復執行
          • scheduleWithFixedDelay() :安排所提交的 Runnable 任務在每次執行完后,等待 delay 所指定的時間后重復執行。

          代碼: ScheduleExecutorService 的例子

          public class ScheduledExecutorServiceTest {

          ??????? public static void main(String[] args)

          ?????????????? throws InterruptedException, ExecutionException{

          ?????????????? //*1

          ??????????????? ScheduledExecutorService service = Executors.newScheduledThreadPool(2);

          ??????????????? //*2

          ??????????????? Runnable task1 = new Runnable() {

          ???????????????????? public void run() {

          ??????????????????????? System.out.println("Task repeating.");

          ???????????????????? }

          ??????????????? };

          ??????????????? //*3

          ??????????????? final ScheduledFuture future1 =

          ??????????????????????? service.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);

          ??????????????? //*4

          ??????????????? ScheduledFuture<String> future2 = service.schedule(new Callable<String>(){

          ???????????????????? public String call(){

          ???????????????????????????? future1.cancel(true);

          ???????????????????????????? return "task cancelled!";

          ??????????????? ?????}

          ??????????????? }, 5, TimeUnit.SECONDS);

          ??????????????? System.out.println(future2.get());

          //*5

          service.shutdown();

          ??????? }

          }

          這個例子有兩個任務,第一個任務每隔一秒打印一句“ Task repeating , 第二個任務在 5 秒鐘后取消第一個任務。

          *1: 初始化一個 ScheduledExecutorService 對象,這個對象的線程池大小為 2

          *2: 用內函數的方式定義了一個 Runnable 任務。

          *3: 調用所定義的 ScheduledExecutorService 對象來執行任務,任務每秒執行一次。能重復執行的任務一定是 Runnable 類型。注意我們可以用 TimeUnit 來制定時間單位,這也是 Java 5.0 里新的特征, 5.0 以前的記時單位是微秒,現在可精確到奈秒。

          *4: 調用 ScheduledExecutorService 對象來執行第二個任務,第二個任務所作的就是在 5 秒鐘后取消第一個任務。

          *5: 關閉服務。

          Executors

          雖然以上提到的接口有其實現的具體類,但為了方便 Java 5.0 建議使用 Executors 的工具類來得到 Executor 接口的具體對象,需要注意的是 Executors 是一個類,不是 Executor 的復數形式。 Executors 提供了以下一些 static 的方法:

          • callable(Runnable task): Runnable 的任務轉化成 Callable 的任務
          • newSingleThreadExecutor: 產生一個 ExecutorService 對象,這個對象只有一個線程可用來執行任務,若任務多于一個,任務將按先后順序執行。
          • newCachedThreadPool(): 產生一個 ExecutorService 對象,這個對象帶有一個線程池,線程池的大小會根據需要調整,線程執行完任務后返回線程池,供執行下一次任務使用。
          • newFixedThreadPool(int poolSize) :產生一個 ExecutorService 對象,這個對象帶有一個大小為 poolSize 的線程池,若任務數量大于 poolSize ,任務會被放在一個 queue 里順序執行。
          • newSingleThreadScheduledExecutor :產生一個 ScheduledExecutorService 對象,這個對象的線程池大小為 1 ,若任務多于一個,任務將按先后順序執行。
          • newScheduledThreadPool(int poolSize): 產生一個 ScheduledExecutorService 對象,這個對象的線程池大小為 poolSize ,若任務數量大于 poolSize ,任務會在一個 queue 里等待執行

          以下是得到和使用 ExecutorService 的例子:

          代碼:如何調用 Executors 來獲得各種服務對象

          //Single Threaded ExecutorService

          ???? ExecutorService singleThreadeService = Executors.newSingleThreadExecutor();

          //Cached ExecutorService

          ???? ExecutorService cachedService = Executors.newCachedThreadPool();

          //Fixed number of ExecutorService

          ???? ExecutorService fixedService = Executors.newFixedThreadPool(3);

          //Single ScheduledExecutorService

          ???? ScheduledExecutorService singleScheduledService =

          ????????? Executors.newSingleThreadScheduledExecutor();

          //Fixed number of ScheduledExecutorService

          ScheduledExecutorService fixedScheduledService =

          ???? Executors.newScheduledThreadPool(3);

          4LockersCondition接口

          在多線程編程里面一個重要的概念是鎖定,如果一個資源是多個線程共享的,為了保證數據的完整性,在進行事務性操作時需要將共享資源鎖定,這樣可以保證在做事務性操作時只有一個線程能對資源進行操作,從而保證數據的完整性。在 5.0 以前,鎖定的功能是由 Synchronized 關鍵字來實現的,這樣做存在幾個問題:

          • 每次只能對一個對象進行鎖定。若需要鎖定多個對象,編程就比較麻煩,一不小心就會出現死鎖現象。
          • 如果線程因拿不到鎖定而進入等待狀況,是沒有辦法將其打斷的

          Java 5.0 里出現兩種鎖的工具可供使用,下圖是這兩個工具的接口及其實現:

          Lock 接口

          ReentrantLock Lock 的具體類, Lock 提供了以下一些方法:

          • lock(): 請求鎖定,如果鎖已被別的線程鎖定,調用此方法的線程被阻斷進入等待狀態。
          • tryLock() :如果鎖沒被別的線程鎖定,進入鎖定狀態,并返回 true 。若鎖已被鎖定,返回 false ,不進入等待狀態。此方法還可帶時間參數,如果鎖在方法執行時已被鎖定,線程將繼續等待規定的時間,若還不行才返回 false 。
          • unlock() :取消鎖定,需要注意的是 Lock 不會自動取消,編程時必須手動解鎖。

          代碼:

          // 生成一個鎖

          Lock lock = new ReentrantLock();

          public void accessProtectedResource() {

          ? lock.lock(); // 取得鎖定

          ? try {

          ??? // 對共享資源進行操作

          ? } finally {

          ??? // 一定記著把鎖取消掉,鎖本身是不會自動解鎖的

          ??? lock.unlock() ;

          ? }

          }

          ReadWriteLock 接口

          為了提高效率有些共享資源允許同時進行多個讀的操作,但只允許一個寫的操作,比如一個文件,只要其內容不變可以讓多個線程同時讀,不必做排他的鎖定,排他的鎖定只有在寫的時候需要,以保證別的線程不會看到數據不完整的文件。 ReadWriteLock 可滿足這種需要。 ReadWriteLock 內置兩個 Lock ,一個是讀的 Lock ,一個是寫的 Lock 。多個線程可同時得到讀的 Lock ,但只有一個線程能得到寫的 Lock ,而且寫的 Lock 被鎖定后,任何線程都不能得到 Lock ReadWriteLock 提供的方法有:

          • readLock(): 返回一個讀的 lock
          • writeLock(): 返回一個寫的 lock, lock 是排他的。

          ReadWriteLock 的例子:

          public class FileOperator{

          ????? // 初始化一個 ReadWriteLock

          ????? ReadWriteLock lock = new ReentrantReadWriteLock();

          public String read() {

          ????? // 得到 readLock 并鎖定

          ??????????? Lock readLock = lock.readLock();

          ?????? ?????readLock.lock();

          ??????????? try {

          ????????????????? // 做讀的工作

          ????????????????? return "Read something";

          ??????????? } finally {

          ???????????????? readLock.unlock();

          ??????????? }

          ????? }

          ?????

          public void write(String content) {

          ????? // 得到 writeLock 并鎖定

          ??????????? Lock writeLock = lock.writeLock();

          ??????????? writeLock.lock();

          ??????????? try {

          ????????????????? // 做讀的工作

          ??????????? } finally {

          ???????????????? writeLock.unlock();

          ??????????? }

          ????? }

          }

          ?

          需要注意的是 ReadWriteLock 提供了一個高效的鎖定機理,但最終程序的運行效率是和程序的設計息息相關的,比如說如果讀的線程和寫的線程同時在等待,要考慮是先發放讀的 lock 還是先發放寫的 lock 。如果寫發生的頻率不高,而且快,可以考慮先給寫的 lock 。還要考慮的問題是如果一個寫正在等待讀完成,此時一個新的讀進來,是否要給這個新的讀發鎖,如果發了,可能導致寫的線程等很久。等等此類問題在編程時都要給予充分的考慮。

          Condition 接口:

          有時候線程取得 lock 后需要在一定條件下才能做某些工作,比如說經典的 Producer Consumer 問題, Consumer 必須在籃子里有蘋果的時候才能吃蘋果,否則它必須暫時放棄對籃子的鎖定,等到 Producer 往籃子里放了蘋果后再去拿來吃。而 Producer 必須等到籃子空了才能往里放蘋果,否則它也需要暫時解鎖等 Consumer 把蘋果吃了才能往籃子里放蘋果。在 Java 5.0 以前,這種功能是由 Object 類的 wait(), notify() notifyAll() 等方法實現的,在 5.0 里面,這些功能集中到了 Condition 這個接口來實現, Condition 提供以下方法:

          • await() :使調用此方法的線程放棄鎖定,進入睡眠直到被打斷或被喚醒。
          • signal(): 喚醒一個等待的線程
          • signalAll() :喚醒所有等待的線程

          Condition 的例子:

          public class Basket {?????

          Lock lock = new ReentrantLock();

          // 產生 Condition 對象

          ???? Condition produced = lock.newCondition();

          ???? Condition consumed = lock.newCondition();

          ???? boolean available = false;

          ?? ??

          ???? public void produce() throws InterruptedException {

          ?????????? lock.lock();

          ?????????? try {

          ???????????????? if(available){

          ??????????????????? consumed.await(); // 放棄 lock 進入睡眠 ?

          ???????????????? }

          ???????????????? /* 生產蘋果 */

          ???????????????? System.out.println("Apple produced.");

          ???????????????? available = true;

          ???????????????? produced.signal(); // 發信號喚醒等待這個 Condition 的線程

          ?????????? } finally {

          ???????????????? lock.unlock();

          ?????????? }

          ???? }

          ????

          ???? public void consume() throws InterruptedException {

          ?????????? lock.lock();

          ?????????? try {

          ???????????????? if(!available){

          ?????????????????????? produced.await();// 放棄 lock 進入睡眠 ?

          ???????????????? }

          ???????????????? /* 吃蘋果 */

          ???????????????? System.out.println("Apple consumed.");

          ???????????????? available = false;

          ???????????????? consumed.signal();// 發信號喚醒等待這個 Condition 的線程

          ?????????? } finally {

          ???????????????? lock.unlock();

          ?????????? }

          ???? }?????

          }

          ConditionTester:

          public class ConditionTester {

          ?????

          ????? public static void main(String[] args) throws InterruptedException{

          final Basket basket = new Basket();

          // 定義一個 producer

          ??????????? Runnable producer = new Runnable() {

          ????????????????? public void run() {

          ??????????????????????? try {

          ????????????????????????????? basket.produce();

          ?????? ?????????????????} catch (InterruptedException ex) {

          ????????????????????????????? ex.printStackTrace();

          ??????????????????????? }

          ????????????????? }

          };

          // 定義一個 consumer

          ??????????? Runnable consumer = new Runnable() {

          ????????????????? public void run() {

          ??????????????????????? try {

          ????????????????????????????? basket.consume();

          ??????????????????????? } catch (InterruptedException ex) {

          ????????????????????????????? ex.printStackTrace();

          ??????????????????????? }

          ????????????????? }

          };

          // 各產生 10 consumer producer

          ??????????? ExecutorService service = Executors.newCachedThreadPool();

          ??????????? for(int i=0; i < 10; i++)

          ????????????????? service.submit(consumer);

          ??????????? Thread.sleep(2000);

          ??????????? for(int i=0; i<10; i++)

          ????????????????? service.submit(producer);

          ??????????? service.shutdown();

          ????? }?????

          }

          5: Synchronizer:同步裝置

          Java 5.0 里新加了 4 個協調線程間進程的同步裝置,它們分別是 Semaphore, CountDownLatch, CyclicBarrier Exchanger.

          Semaphore:

          用來管理一個資源池的工具, Semaphore 可以看成是個通行證,線程要想從資源池拿到資源必須先拿到通行證, Semaphore 提供的通行證數量和資源池的大小一致。如果線程暫時拿不到通行證,線程就會被阻斷進入等待狀態。以下是一個例子:

          public class Pool {

          ??? ??ArrayList<String> pool = null;

          ????? Semaphore pass = null;

          ????? public Pool(int size){

          ??????????? // 初始化資源池

          ??????????? pool = new ArrayList<String>();

          ??????????? for(int i=0; i<size; i++){

          ????????????????? pool.add("Resource "+i);

          ??????????? }

          ??? ???????? //Semaphore 的大小和資源池的大小一致

          ??????????? pass = new Semaphore(size);

          ????? }

          ????? public String get() throws InterruptedException{

          ??????????? // 獲取通行證 , 只有得到通行證后才能得到資源

          ??????????? pass.acquire();

          ??????????? return getResource();

          ????? }

          ????? public void put(String resource){

          ??????????? // 歸還通行證,并歸還資源

          ??????????? pass.release();

          ??????????? releaseResource(resource);

          ????? }

          ???? private synchronized String getResource() {

          ??????????? String result = pool.get(0);

          ??????????? pool.remove(0);

          ??????????? System.out.println("Give out "+result);

          ??????????? return result;

          ????? }

          ????? private synchronized void releaseResource(String resource) {

          ??????????? System.out.println("return "+resource);

          ??????????? pool.add(resource);

          ????? }

          }

          SemaphoreTest:

          public class SemaphoreTest {

          ????? public static void main(String[] args){

          ??????????? final Pool aPool = new Pool(2);

          ??????????? Runnable worker = new Runnable() {

          ????????????????? public void run() {

          ??????????????????????? String resource = null;

          ??????? ????????????????try {

          ????????????????????????????? // 取得 resource

          ????????????????????????????? resource = aPool.get();

          ??????????????????????? } catch (InterruptedException ex) {

          ????????????????????????????? ex.printStackTrace();

          ??????????????????????? }

          ??????????????????????? // resource 做工作

          ??????????????????????? System.out.println("I worked on "+resource);

          ??????????????????????? // 歸還 resource

          ??????????????????????? aPool.put(resource);

          ????????????????? }

          ??????????? };

          ??????????? ExecutorService service = Executors.newCachedThreadPool();

          ??????????? for(int i=0; i<20; i++){

          ????????????????? service.submit(worker);

          ??????????? }

          ??????????? service.shutdown();

          ????? }????

          }

          CountDownLatch:

          CountDownLatch 是個計數器,它有一個初始數,等待這個計數器的線程必須等到計數器倒數到零時才可繼續。比如說一個 Server 啟動時需要初始化 4 個部件, Server 可以同時啟動 4 個線程去初始化這 4 個部件,然后調用 CountDownLatch(4).await() 阻斷進入等待,每個線程完成任務后會調用一次 CountDownLatch.countDown() 來倒計數 , 4 個線程都結束時 CountDownLatch 的計數就會降低為 0 ,此時 Server 就會被喚醒繼續下一步操作。 CountDownLatch 的方法主要有:

          • await() :使調用此方法的線程阻斷進入等待
          • countDown(): 倒計數,將計數值減 1
          • getCount(): 得到當前的計數值

          CountDownLatch 的例子:一個 server 調了三個 ComponentThread 分別去啟動三個組件,然后 server 等到組件都啟動了再繼續。

          public class Server {

          ????? public static void main(String[] args) throws InterruptedException{

          ??????????? System.out.println("Server is starting.");

          ?????????? ?// 初始化一個初始值為 3 CountDownLatch

          ??????????? CountDownLatch latch = new CountDownLatch(3);

          ??????????? // 3 個線程分別去啟動 3 個組件

          ??????????? ExecutorService service = Executors.newCachedThreadPool();

          ??????????? service.submit(new ComponentThread(latch, 1));

          ????????? ??service.submit(new ComponentThread(latch, 2));

          ??????????? service.submit(new ComponentThread(latch, 3));

          ??????????? service.shutdown();

          ??????????? // 進入等待狀態

          ??????????? latch.await();

          ??????????? // 當所需的三個組件都完成時, Server 就可繼續了

          ??????????? System.out.println("Server is up!");

          ????? }

          }

          ?

          public class ComponentThread implements Runnable{

          ????? CountDownLatch latch;

          ????? int ID;

          ????? /** Creates a new instance of ComponentThread */

          ????? public ComponentThread(CountDownLatch latch, int ID) {

          ??????????? this.latch = latch;

          ??????????? this.ID = ID;

          ????? }

          ????? public void run() {

          ??????????? System.out.println("Component "+ID + " initialized!");

          ??????????? // 將計數減一

          ??????????? latch.countDown();

          ????? }????

          }

          運行結果:

          Server is starting.

          Component 1 initialized!

          Component 3 initialized!

          Component 2 initialized!

          Server is up!

          CyclicBarrier:

          CyclicBarrier 類似于 CountDownLatch 也是個計數器,不同的是 CyclicBarrier 數的是調用了 CyclicBarrier.await() 進入等待的線程數,當線程數達到了 CyclicBarrier 初始時規定的數目時,所有進入等待狀態的線程被喚醒并繼續。 CyclicBarrier 就象它名字的意思一樣,可看成是個障礙,所有的線程必須到齊后才能一起通過這個障礙。 CyclicBarrier 初始時還可帶一個 Runnable 的參數,此 Runnable 任務在 CyclicBarrier 的數目達到后,所有其它線程被喚醒前被執行。

          CyclicBarrier 提供以下幾個方法:

          • await() :進入等待
          • getParties() :返回此 barrier 需要的線程數
          • reset() :將此 barrier 重置

          以下是使用 CyclicBarrier 的一個例子:兩個線程分別在一個數組里放一個數,當這兩個線程都結束后,主線程算出數組里的數的和(這個例子比較無聊,我沒有想到更合適的例子)

          public class MainThread {

          public static void main(String[] args)

          ????? throws InterruptedException, BrokenBarrierException, TimeoutException{

          ??????????? final int[] array = new int[2];

          ??????????? CyclicBarrier barrier = new CyclicBarrier(2,

          ????????????????? new Runnable() {// 在所有線程都到達 Barrier 時執行

          ????????????????? public void run() {

          ??????????????????????? System.out.println("Total is:"+(array[0]+array[1]));

          ????????????????? }

          ??????????? });???????????

          ??????????? // 啟動線程

          ??????????? new Thread(new ComponentThread(barrier, array, 0)).start();

          ??????????? new Thread(new ComponentThread(barrier, array, 1)).start();???

          ????? }?????

          }

          ?

          public class ComponentThread implements Runnable{

          ????? CyclicBarrier barrier;

          ????? int ID;

          ????? int[] array;

          ????? public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {

          ??????????? this.barrier = barrier;

          ??????????? this.ID = ID;

          ??????????? this.array = array;

          ????? }

          ????? public void run() {

          ??????????? try {

          ????????????????? array[ID] = new Random().nextInt();

          ????????????????? System.out.println(ID+ " generates:"+array[ID]);

          ????????????????? // 該線程完成了任務等在 Barrier

          ????????????????? barrier.await();

          ??????????? } catch (BrokenBarrierException ex) {

          ????????????????? ex.printStackTrace();

          ??????????? } catch (InterruptedException ex) {

          ????????????????? ex.printStackTrace();

          ??????????? }

          ????? }

          }

          Exchanger:

          顧名思義 Exchanger 讓兩個線程可以互換信息。用一個例子來解釋比較容易。例子中服務生線程往空的杯子里倒水,顧客線程從裝滿水的杯子里喝水,然后通過 Exchanger 雙方互換杯子,服務生接著往空杯子里倒水,顧客接著喝水,然后交換,如此周而復始。

          class FillAndEmpty {

          ????? // 初始化一個 Exchanger ,并規定可交換的信息類型是 DataCup

          ????? Exchanger<Cup> exchanger = new Exchanger();

          ???? ?Cup initialEmptyCup = ...; // 初始化一個空的杯子

          ????? Cup initialFullCup = ...; // 初始化一個裝滿水的杯子

          ????? // 服務生線程

          ????? class Waiter implements Runnable {

          ??????????? public void run() {

          ????????????????? Cup currentCup = initialEmptyCup;

          ????????????????? try {

          ?????? ?????????????????// 往空的杯子里加水

          ??????????????????????? currentCup.addWater();

          ??????????????????????? // 杯子滿后和顧客的空杯子交換

          ??????????????????????? currentCup = exchanger.exchange(currentCup);

          ????????????????? } catch (InterruptedException ex) { ... handle ... }

          ? ?????????? }

          ????? }

          ????? // 顧客線程

          ????? class Customer implements Runnable {

          ??????????? public void run() {

          ????????????????? DataCup currentCup = initialFullCup;

          ????????????????? try {

          ??????????????????????? // 把杯子里的水喝掉

          ??????????????????????? currentCup.drinkFromCup();

          ??????????????????????? // 將空杯子和服務生的滿杯子交換

          ??????????????????????? currentCup = exchanger.exchange(currentCup);

          ????????????????? } catch (InterruptedException ex) { ... handle ...}

          ??????????? }

          ????? }

          ?????

          ????? void start() {

          ??????? ????new Thread(new Waiter()).start();

          ??????????? new Thread(new Customer()).start();

          ????? }

          }

          6: BlockingQueue接口

          BlockingQueue 是一種特殊的 Queue ,若 BlockingQueue 是空的,從 BlockingQueue 取東西的操作將會被阻斷進入等待狀態直到 BlocingkQueue 進了新貨才會被喚醒。同樣,如果 BlockingQueue 是滿的任何試圖往里存東西的操作也會被阻斷進入等待狀態,直到 BlockingQueue 里有新的空間才會被喚醒繼續操作。 BlockingQueue 提供的方法主要有:

          • add(anObject): anObject 加到 BlockingQueue 里,如果 BlockingQueue 可以容納返回 true ,否則拋出 IllegalStateException 異常。
          • offer(anObject) :把 anObject 加到 BlockingQueue 里,如果 BlockingQueue 可以容納返回 true ,否則返回 false 。
          • put(anObject) :把 anObject 加到 BlockingQueue 里,如果 BlockingQueue 沒有空間,調用此方法的線程被阻斷直到 BlockingQueue 里有新的空間再繼續。
          • poll(time) :取出 BlockingQueue 里排在首位的對象,若不能立即取出可等 time 參數規定的時間。取不到時返回 null 。
          • take() :取出 BlockingQueue 里排在首位的對象,若 BlockingQueue 為空,阻斷進入等待狀態直到 BlockingQueue 有新的對象被加入為止。

          根據不同的需要 BlockingQueue 4 種具體實現:

          • ArrayBlockingQueue :規定大小的 BlockingQueue ,其構造函數必須帶一個 int 參數來指明其大小。其所含的對象是以 FIFO (先入先出)順序排序的。
          • LinkedBlockingQueue :大小不定的 BlockingQueue ,若其構造函數帶一個規定大小的參數,生成的 BlockingQueue 有大小限制,若不帶大小參數,所生成的 BlockingQueue 的大小由 Integer.MAX_VALUE 來決定。其所含的對象是以 FIFO (先入先出)順序排序的。 LinkedBlockingQueue ArrayBlockingQueue 比較起來,它們背后所用的數據結構不一樣,導致 LinkedBlockingQueue 的數據吞吐量要大于 ArrayBlockingQueue ,但在線程數量很大時其性能的可預見性低于 ArrayBlockingQueue 。
          • PriorityBlockingQueue :類似于 LinkedBlockingQueue ,但其所含對象的排序不是 FIFO ,而是依據對象的自然排序順序或者是構造函數所帶的 Comparator 決定的順序。
          • SynchronousQueue :特殊的 BlockingQueue ,對其的操作必須是放和取交替完成的。

          下面是用 BlockingQueue 來實現 Producer Consumer 的例子:

          public class BlockingQueueTest {

          ????? static BlockingQueue<String> basket;

          ????? public BlockingQueueTest() {

          ??????????? // 定義了一個大小為 2 BlockingQueue ,也可根據需要用其他的具體類

          ??????????? basket = new ArrayBlockingQueue<String>(2);

          ????? }

          ????? class Producor implements Runnable {

          ??????????? public void run() {

          ????????????????? while(true){

          ??????????????????????? try {

          ????????????????????????????? // 放入一個對象,若 basket 滿了,等到 basket 有位置

          ?????????????????????? ???????basket.put("An apple");

          ??????????????????????? } catch (InterruptedException ex) {

          ????????????????????????????? ex.printStackTrace();

          ??????????????????????? }

          ????????????????? }

          ??????????? }

          ????? }

          ????? class Consumer implements Runnable {

          ? ??????????public void run() {

          ????????????????? while(true){

          ??????????????????????? try {

          ????????????????????????????? // 取出一個對象,若 basket 為空,等到 basket 有東西為止

          ????????????????????????????? String result = basket.take();

          ??????????????????????? } catch (InterruptedException ex) {

          ????????????????????????????? ex.printStackTrace();

          ??????????????????????? }

          ????????????????? }

          ????? ??????}???????????

          ????? }

          ????? public void execute(){

          ??????????? for(int i=0; i<10; i++){

          ????????????????? new Thread(new Producor()).start();

          ????????????????? new Thread(new Consumer()).start();

          ??????????? }???????????

          ????? }

          ????? public static void main(String[] args){

          ??????????? BlockingQueueTest test = new BlockingQueueTest();

          ??????????? test.execute();

          ????? }?????

          }

          7Atomics 原子級變量

          原子量級的變量,主要的類有 AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference …… 。這些原子量級的變量主要提供兩個方法:

          • compareAndSet(expectedValue, newValue): 比較當前的值是否等于 expectedValue , 若等于把當前值改成 newValue ,并返回 true 。若不等,返回 false 。
          • getAndSet(newValue): 把當前值改為 newValue ,并返回改變前的值。

          這些原子級變量利用了現代處理器( CPU )的硬件支持可把兩步操作合為一步的功能,避免了不必要的鎖定,提高了程序的運行效率。

          8Concurrent Collections 共點聚集

          Java 的聚集框架里可以調用 Collections.synchronizeCollection(aCollection) 將普通聚集改變成同步聚集,使之可用于多線程的環境下。 但同步聚集在一個時刻只允許一個線程訪問它,其它想同時訪問它的線程會被阻斷,導致程序運行效率不高。 Java 5.0 里提供了幾個共點聚集類,它們把以前需要幾步才能完成的操作合成一個原子量級的操作,這樣就可讓多個線程同時對聚集進行操作,避免了鎖定,從而提高了程序的運行效率。 Java 5.0 目前提供的共點聚集類有: ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList CopyOnWriteArraySet.

          ?



          Trackback: http://tb.blog.csdn.net/TrackBack.aspx?PostId=846454

          posted on 2006-12-24 22:37 海思 閱讀(360) 評論(1)  編輯  收藏 所屬分類: Java小技術

          評論

          # re: [轉]Java 5.0多線程編程 2006-12-27 07:06 jrobot[匿名]
          非常有用的東東,剛在api里看到這幾個包想用用呢,非常謝謝分享  回復  更多評論
            

          主站蜘蛛池模板: 项城市| 罗定市| 连州市| 方山县| 海兴县| 临夏县| 汉沽区| 财经| 会昌县| 衡水市| 南充市| 泰安市| 广东省| 沙田区| 兴文县| 曲靖市| 定西市| 青铜峡市| 兰西县| 库伦旗| 曲周县| 景东| 白朗县| 云安县| 勃利县| 石柱| 石阡县| 古田县| 海兴县| 长葛市| 肥西县| 祥云县| 文登市| 普宁市| 合江县| 耿马| 工布江达县| 四会市| 高雄市| 宝丰县| 防城港市|