posts - 28,  comments - 13,  trackbacks - 0

          9.?? 同步(Concurrent)

          ????
          1.????? Executor接口

          ???? Executor接口提供了一個類似于線程池的管理工具。用于只需要往Executor中提交Runnable對象,剩下的啟動線程等工作,都會有對應的實現類來完成。ScheduledExecutorService比ExecutorService增加了,時間上的控制,即用戶可以在提交的時候額外的定義該任務的啟動時機,以及隨后的執行間隔和延遲等。

          ???? 例子:

          ???? 任務:

          ???? public class ETask implements Runnable{

          ????????? private int id = 0;

          ????????? public ETask(int id){

          ?????????????? this.id = id;

          ????????? }

          ????????? public void run(){

          ?????????????? try{

          ?????????????????? System.out.println(id+" Start");

          ?????????????????? Thread.sleep(1000);

          ?????????????????? System.out.println(id+" Do");

          ?????????????????? Thread.sleep(1000);

          ?????????????????? System.out.println(id+" Exit");

          ????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ????????? }

          ???? }

          ????

          ???? 測試類:

          ???? public class ETest{

          ????????? public static void main(String[] args){???????

          ????????????? ExecutorService executor = Executors.newFixedThreadPool(2);

          ????????????? for(int i=0;i<5;i++){

          ?????????????????? Runnable r = new ETask(i);

          ?????????????????? executor.execute(r);

          ????????????? }

          ????????????? executor.shutdown();

          ????????? }

          ???? }

          ?

          ???? 輸出:

          ???? 0 Start

          ???? 1 Start

          ???? 0 Do

          ???? 1 Do

          ???? 0 Exit

          ???? 2 Start

          ???? 1 Exit

          ???? 3 Start

          ???? 2 Do

          ???? 3 Do

          ???? 2 Exit

          ???? 3 Exit

          ???? 4 Start

          ???? 4 Do

          ???? 4 Exit

          ?

          2.????? Future和Callable

          ???? Callable是一個類似于Runnable的接口,他與Runnable的區別是,她在執行完畢之后能夠返回結果。Future用于獲取線程的執行結果,或者取消已向Executor的任務。當我們通過Future提供的get()方法獲取任務的執行結果時,如果任務沒有完成,則調用get()方法的線程將會被阻塞,知道任務完成為止。一般我們都會使用Future的實現類FutureTask。

          ???? 例子:

          ???? Callable對象:

          ???? public class ETask implements Callable{

          ????????? private String id = null;

          ????????? public ETask(String id){

          ?????????????? this.id = id;

          ????????? }

          ????

          ????????? public String call(){

          ????????????? try{

          ?????????????????? System.out.println(id+" Start");

          ?????????????????? Thread.sleep(1000);

          ?????????????????? System.out.println(id+" Do");

          ?????????????????? Thread.sleep(1000);

          ?????????????????? System.out.println(id+" Exit");??????????

          ????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ????????????? return id;

          ????????? }

          ???? }

          ?

          ???? 測試類:

          ???? public class ETest{

          ????????? public static void main(String[] args){???????

          ????????????? ExecutorService executor = Executors.newFixedThreadPool(2);

          ????????????? for(int i=0;i<5;i++){???????????

          ?????????????????? try{

          ?????????????????????? Callable c = new ETask(String.valueOf(i));

          ??????????????????????? FutureTask ft = new FutureTask(c);

          ??????????????????????? executor.execute(ft);

          ??????????????????????? System.out.println("Finish:" + ft.get());?????????

          ?????????????????? }catch(Exception e){

          ?????????????????????? e.printStackTrace();

          ?????????????????? }

          ????????????? }

          ?????????????? executor.shutdown();

          ????????? }

          ???? }

          ?

          ???? 輸出:

          ???? 0 Start

          ???? 0 Do

          ???? 0 Exit

          ???? Finish:0

          ???? 1 Start

          ???? 1 Do

          ???? 1 Exit

          ???? Finish:1

          ???? 2 Start

          ???? …

          3.????? CompletionService和ExecutorCompletionService

          ???? CompletionService類似于一個Executor和Queue的混合。我們可以通過submit()向CompletionService提交任務,然后通過poll()來獲取第一個完成的任務,也可以通過take()來阻塞等待下一個完成的任務。ExecutorCompletionService是CompletionService的實現類,他需要提供一個Executor作為構造函數的參數。

          ???? 例子:

          ???? Executor executor = …;

          ???? CompletionService cs = new ExecutorCompletionService(executor);

          ???? Future fs = cs.submit(…);

          ???? Future ft = cs.take();

          ?

          4.????? Semaphore

          ???? 信號量是用于同步和互斥的低級原語。信號量提供的acquire()和release()操作,與操作系統上的p,v操作同。

          ???? 例子:

          ???? 緩沖區:

          ???? public class Buffer{

          ????????? private Semaphore s = null;

          ????????? private Semaphore p = null;

          ????????? Vector<Integer> v = new Vector<Integer>();

          ?????????

          ????????? public Buffer(int capacity){

          ?????????????? s = new Semaphore(capacity);

          ????????????? p = new Semaphore(0);

          ????????? }

          ????

          ????????? public void put(int i){

          ????????????? try{

          ?????????????????? s.acquire();

          ?????????????????? v.add(new Integer(i));

          ?????????????????? p.release();

          ?????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ????????? }

          ????

          ????????? public int get(){?

          ?????????????? int i = 0;

          ????????????? try{

          ?????????????????? p.acquire();

          ?????????????????? i = ((Integer)v.remove(0)).intValue();

          ?????????????????? s.release();

          ????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ?????????????? return i;

          ????????? }???

          ???? }

          ?

          ???? 生產者:

          ???? public class Producer extends Thread{

          ????????? private Buffer b;

          ????????? private int count;

          ????????? private int step;

          ????????? private int id;

          ?

          ????????? public Producer(Buffer b,int step,int id){

          ?????????????? this.b =? b;

          ????????????? this.step = step;

          ????????????? this.id = id;

          ?????????????? count = 0;

          ????????? }

          ????

          ????????? public void run(){

          ????????????? try{

          ?????????????????? while(true){

          ?????????????????????? System.out.println("In put");

          ??????????????????????? b.put(count);

          ??????????????????????? System.out.println("Producer "+id+":"+count);

          ??????????????????????? count++;

          ?????????????????????? Thread.sleep(step);

          ??????????????????????? System.out.println("Out put");

          ?????????????????? }

          ?????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ????????? }

          ???? }

          ?

          ???? 消費者:

          ???? public class Consumer extends Thread{

          ????????? private Buffer b;

          ????????? private int step;

          ????????? private int id;

          ????

          ????????? public Consumer(Buffer b,int step,int id){

          ????????????? this.b = b;

          ?????????????? this.step = step;

          ????????????? this.id = id;

          ????????? }

          ?????????

          ????????? public void run(){

          ????????????? try{

          ?????????????????? while(true){

          ??????????????????????? System.out.println("In get");

          ?????????????????????? System.out.println("\t\tConsume "+id+":"+b.get());

          ??????????????????????? System.out.println("Out get");

          ??????????????????????? Thread.sleep(step);

          ?????????????????? }

          ?????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }???

          ????????? }

          ???? }

          ?

          ???? 測試程序:

          ???? public class CPTest{

          ????????? public static void main(String[] args){

          ?????????????? Buffer b = new Buffer(3);

          ????????????? Consumer c1 = new Consumer(b,1000,1);

          ????????????? Consumer c2 = new Consumer(b,1000,2);

          ?????????????? Producer p1 = new Producer(b,100,1);

          ????????????? Producer p2 = new Producer(b,100,2);

          ????????

          ????????????? c1.start();

          ?????????????? c2.start();

          ????????????? p1.start();

          ????????????? p2.start();

          ????????? }

          ???? }

          ?

          5.????? CyclicBarrier

          ???? CyclicBarrier可以讓一組線程在某一個時間點上進行等待,當所有進程都到達該等待點后,再繼續往下執行。CyclicBarrier使用完以后,通過調用reset()方法,可以重用該CyclicBarrier。線程通過調用await()來減少計數。

          ?

          CyclicBarrier
          ?
          ?

          ?

          ?

          ?

          ?

          ?

          ?

          ???? 例子:

          ???? 任務:

          ???? public class Task extends Thread{

          ????????? private String id;

          ????????? private CyclicBarrier c;

          ????????? private int time;

          ????

          ????????? public Task(CyclicBarrier c,String id,int time){

          ?????????????? this.c = c;

          ????????????? this.id = id;

          ?????????????? this.time = time;

          ????????? }

          ????

          ????????? public void run(){

          ?????????????? try{

          ?????????????????? System.out.println(id+" Start");

          ????????????????? Thread.sleep(time);

          ?????????????????? System.out.println(id+" Finish");

          ?????????????????? c.await();

          ?????????????????? System.out.println(id+" Exit");?????????

          ?????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ????????? }???

          ???? }

          ?

          ???? 測試類:

          ???? public class Test{

          ????????? public static void main(String[] args){

          ????????????? CyclicBarrier c = new CyclicBarrier(3,new Runnable(){

          ?????????????????? public void run(){

          ??????????????????????? System.out.println("All Work Done");

          ?????????????????? }

          ????????????? });

          ?????????????? Task t1 = new Task(c,"1",1000);

          ????????????? Task t2 = new Task(c,"2",3000);

          ????????????? Task t3 = new Task(c,"3",5000);

          ?????????????? t1.start();

          ????????????? t2.start();

          ????????????? t3.start();???????

          ????????? }

          ???? }

          ?

          ???? 輸出結果:

          ???? 1 Start

          ???? 2 Start

          ???? 3 Start

          ???? 1 Finish

          ???? 2 Finish

          ???? 3 Finish

          ???? All Work Done

          ???? 3 Exit

          ???? 1 Exit

          ???? 2 Exit

          ?

          6.????? CountdownLatch

          ???? CountdownLatch具有與CyclicBarrier相似的功能,也能讓一組線程在某個點上進行同步。但是與CyclicBarrier不同的是:1.CountdownLatch不能重用,2.線程在CountdownLatch上調用await()操作一定會被阻塞,直到計數值為0時才會被喚醒,而且計數值只能通過conutDown()方法進行減少。

          特別的,當CountdownLatch的值為1時,該Latch被稱為“啟動大門”,所有任務線程都在該Latch上await(),直到某個非任務線程調用countDown()觸發,所有任務線程開始同時工作。

          ?

          7.????? Exchanger

          ???? Exchanger是一個類似于計數值為2的CyclicBarrier。她允許兩個線程在某個點上進行數據交換。

          ?????? 例子:

          ???? public class FillAndEmpty {

          ???????? Exchanger<DataBuffer> exchanger = new Exchanger();

          ???????? DataBuffer initialEmptyBuffer = ... a made-up type

          ???????? DataBuffer initialFullBuffer = ...

          ?

          ???????? public class FillingLoop implements Runnable {

          ????????????? public void run() {

          ?????????????????? DataBuffer currentBuffer = initialEmptyBuffer;

          ?????????????????? try {

          ?????????????????????? while (currentBuffer != null) {

          ??????????????????????????? addToBuffer(currentBuffer);

          ??????????????????????????? if (currentBuffer.full())

          ???????????????????????????????? currentBuffer = exchanger.exchange(currentBuffer);

          ?????????????????????? }

          ?????????????????? }catch(InterruptedException ex) { ... handle ... }

          ????????????? }

          ???????? }

          ?

          ???????? public class EmptyingLoop implements Runnable {

          ????????????? public void run() {

          ?????????????????? DataBuffer currentBuffer = initialFullBuffer;

          ?????????????????? try {

          ?????????????????????? while (currentBuffer != null) {

          ??????????????????????????? takeFromBuffer(currentBuffer);

          ??????????????????????????? if (currentBuffer.empty())

          ???????????????????????????????? currentBuffer = exchanger.exchange(currentBuffer);

          ?????????????????????? }

          ?????????????????? } catch (InterruptedException ex) { ... handle ...}

          ????????????? }

          ???????? }

          ?

          ???????? public void start() {

          ????????????? new Thread(new FillingLoop()).start();

          ????????????? new Thread(new EmptyingLoop()).start();

          ???????? }

          ???? }

          Exchange
          ?
          ?

          ?

          ????

          ?

          ?

          ?

          ?

          ?

          ?

          ?

          ?

          8.????? Lock,Condition

          ???? 鎖是最基本的同步原語。通過在鎖上面調用lock()和unlock()操作,可以達到與synchronized關鍵字相似的效果,但是有一點要注意的是,鎖必須顯式釋放,如果由于拋出異常,而沒有釋放鎖,將導致死鎖出現。Condition提供的await(),signal(),signal()操作,與原來的wai(),notify(),notifyAll()操作具有相似的含義。Lock的兩個主要子類是ReentrantLock和ReadWriteLock。其中ReadWriteLock的作用是允許多人讀,而一人寫。

          ???? 例子:

          ???? 使用Lock和Condition的生產者,消費者問題

          ???? public class BoundedBuffer {

          ???????? final Lock lock = new ReentrantLock();

          ???????? final Condition notFull? = lock.newCondition();

          ???????? final Condition notEmpty = lock.newCondition();

          ???????? final Object[] items = new Object[100];

          ???????? int putptr, takeptr, count;

          ????????

          ???????? public void put(Object x) throws InterruptedException {

          ????????????? lock.lock();

          ????????????? try {

          ?????????????????? while (count == items.length)

          ?????????????????????? notFull.await();

          ?????????????????? items[putptr] = x;

          ?????????????????? if (++putptr == items.length)

          ??????????????????????? putptr = 0;

          ?????????????????? ++count;

          ?????????????????? notEmpty.signal();

          ????????????? } finally {

          ?????????????????? lock.unlock();

          ?????????????? }

          ????????? }

          ????

          ????????? public Object take() throws InterruptedException {

          ?????????????? lock.lock();

          ????????????? try {

          ?????????????????? while (count == 0)

          ?????????????????????? notEmpty.await();

          ?????????????????? Object x = items[takeptr];

          ?????????????????? if (++takeptr == items.length)

          ??????????????????????? takeptr = 0;

          ?????????????????? --count;

          ?????????????????? notFull.signal();

          ?????????????????? return x;

          ?????????????? } finally {

          ?????????????????? lock.unlock();

          ????????????? }

          ????????? }

          ???? }???

          ?

          9.????? 小結:新的concurrent包提供了一個從低到高的同步操作。

          ?

          posted on 2007-01-22 17:33 Lib 閱讀(1611) 評論(0)  編輯  收藏 所屬分類: Java
          <2025年6月>
          25262728293031
          1234567
          891011121314
          15161718192021
          22232425262728
          293012345



          我的JavaEye博客
          http://lib.javaeye.com


          常用鏈接

          留言簿(2)

          隨筆分類

          文章分類

          FLASH

          Java

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 荔波县| 大兴区| 威宁| 甘德县| 瑞安市| 唐海县| 梓潼县| 清水河县| 独山县| 博野县| 当涂县| 浦北县| 天柱县| 府谷县| 堆龙德庆县| 米林县| 九江县| 神农架林区| 洱源县| 霍林郭勒市| 闽清县| 寿宁县| 曲阳县| 惠来县| 喀喇沁旗| 阿克陶县| 绥江县| 鸡东县| 鄂州市| 延长县| 寻乌县| 集贤县| 滁州市| 新巴尔虎左旗| 平定县| 勃利县| 桦南县| 平遥县| 辽阳县| 阜平县| 抚宁县|