posts - 28,  comments - 13,  trackbacks - 0

          9.?? 同步(Concurrent)

          ????
          1.????? Executor接口

          ???? Executor接口提供了一個(gè)類(lèi)似于線程池的管理工具。用于只需要往Executor中提交Runnable對(duì)象,剩下的啟動(dòng)線程等工作,都會(huì)有對(duì)應(yīng)的實(shí)現(xiàn)類(lèi)來(lái)完成。ScheduledExecutorService比ExecutorService增加了,時(shí)間上的控制,即用戶可以在提交的時(shí)候額外的定義該任務(wù)的啟動(dòng)時(shí)機(jī),以及隨后的執(zhí)行間隔和延遲等。

          ???? 例子:

          ???? 任務(wù):

          ???? public class ETask implements Runnable{

          ????????? private int id = 0;

          ????????? public ETask(int id){

          ?????????????? this.id = id;

          ????????? }

          ????????? public void run(){

          ?????????????? try{

          ?????????????????? System.out.println(id+" Start");

          ?????????????????? Thread.sleep(1000);

          ?????????????????? System.out.println(id+" Do");

          ?????????????????? Thread.sleep(1000);

          ?????????????????? System.out.println(id+" Exit");

          ????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ????????? }

          ???? }

          ????

          ???? 測(cè)試類(lèi):

          ???? public class ETest{

          ????????? public static void main(String[] args){???????

          ????????????? ExecutorService executor = Executors.newFixedThreadPool(2);

          ????????????? for(int i=0;i<5;i++){

          ?????????????????? Runnable r = new ETask(i);

          ?????????????????? executor.execute(r);

          ????????????? }

          ????????????? executor.shutdown();

          ????????? }

          ???? }

          ?

          ???? 輸出:

          ???? 0 Start

          ???? 1 Start

          ???? 0 Do

          ???? 1 Do

          ???? 0 Exit

          ???? 2 Start

          ???? 1 Exit

          ???? 3 Start

          ???? 2 Do

          ???? 3 Do

          ???? 2 Exit

          ???? 3 Exit

          ???? 4 Start

          ???? 4 Do

          ???? 4 Exit

          ?

          2.????? Future和Callable

          ???? Callable是一個(gè)類(lèi)似于Runnable的接口,他與Runnable的區(qū)別是,她在執(zhí)行完畢之后能夠返回結(jié)果。Future用于獲取線程的執(zhí)行結(jié)果,或者取消已向Executor的任務(wù)。當(dāng)我們通過(guò)Future提供的get()方法獲取任務(wù)的執(zhí)行結(jié)果時(shí),如果任務(wù)沒(méi)有完成,則調(diào)用get()方法的線程將會(huì)被阻塞,知道任務(wù)完成為止。一般我們都會(huì)使用Future的實(shí)現(xiàn)類(lèi)FutureTask。

          ???? 例子:

          ???? Callable對(duì)象:

          ???? public class ETask implements Callable{

          ????????? private String id = null;

          ????????? public ETask(String id){

          ?????????????? this.id = id;

          ????????? }

          ????

          ????????? public String call(){

          ????????????? try{

          ?????????????????? System.out.println(id+" Start");

          ?????????????????? Thread.sleep(1000);

          ?????????????????? System.out.println(id+" Do");

          ?????????????????? Thread.sleep(1000);

          ?????????????????? System.out.println(id+" Exit");??????????

          ????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ????????????? return id;

          ????????? }

          ???? }

          ?

          ???? 測(cè)試類(lèi):

          ???? public class ETest{

          ????????? public static void main(String[] args){???????

          ????????????? ExecutorService executor = Executors.newFixedThreadPool(2);

          ????????????? for(int i=0;i<5;i++){???????????

          ?????????????????? try{

          ?????????????????????? Callable c = new ETask(String.valueOf(i));

          ??????????????????????? FutureTask ft = new FutureTask(c);

          ??????????????????????? executor.execute(ft);

          ??????????????????????? System.out.println("Finish:" + ft.get());?????????

          ?????????????????? }catch(Exception e){

          ?????????????????????? e.printStackTrace();

          ?????????????????? }

          ????????????? }

          ?????????????? executor.shutdown();

          ????????? }

          ???? }

          ?

          ???? 輸出:

          ???? 0 Start

          ???? 0 Do

          ???? 0 Exit

          ???? Finish:0

          ???? 1 Start

          ???? 1 Do

          ???? 1 Exit

          ???? Finish:1

          ???? 2 Start

          ???? …

          3.????? CompletionService和ExecutorCompletionService

          ???? CompletionService類(lèi)似于一個(gè)Executor和Queue的混合。我們可以通過(guò)submit()向CompletionService提交任務(wù),然后通過(guò)poll()來(lái)獲取第一個(gè)完成的任務(wù),也可以通過(guò)take()來(lái)阻塞等待下一個(gè)完成的任務(wù)。ExecutorCompletionService是CompletionService的實(shí)現(xiàn)類(lèi),他需要提供一個(gè)Executor作為構(gòu)造函數(shù)的參數(shù)。

          ???? 例子:

          ???? Executor executor = …;

          ???? CompletionService cs = new ExecutorCompletionService(executor);

          ???? Future fs = cs.submit(…);

          ???? Future ft = cs.take();

          ?

          4.????? Semaphore

          ???? 信號(hào)量是用于同步和互斥的低級(jí)原語(yǔ)。信號(hào)量提供的acquire()和release()操作,與操作系統(tǒng)上的p,v操作同。

          ???? 例子:

          ???? 緩沖區(qū):

          ???? public class Buffer{

          ????????? private Semaphore s = null;

          ????????? private Semaphore p = null;

          ????????? Vector<Integer> v = new Vector<Integer>();

          ?????????

          ????????? public Buffer(int capacity){

          ?????????????? s = new Semaphore(capacity);

          ????????????? p = new Semaphore(0);

          ????????? }

          ????

          ????????? public void put(int i){

          ????????????? try{

          ?????????????????? s.acquire();

          ?????????????????? v.add(new Integer(i));

          ?????????????????? p.release();

          ?????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ????????? }

          ????

          ????????? public int get(){?

          ?????????????? int i = 0;

          ????????????? try{

          ?????????????????? p.acquire();

          ?????????????????? i = ((Integer)v.remove(0)).intValue();

          ?????????????????? s.release();

          ????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ?????????????? return i;

          ????????? }???

          ???? }

          ?

          ???? 生產(chǎn)者:

          ???? public class Producer extends Thread{

          ????????? private Buffer b;

          ????????? private int count;

          ????????? private int step;

          ????????? private int id;

          ?

          ????????? public Producer(Buffer b,int step,int id){

          ?????????????? this.b =? b;

          ????????????? this.step = step;

          ????????????? this.id = id;

          ?????????????? count = 0;

          ????????? }

          ????

          ????????? public void run(){

          ????????????? try{

          ?????????????????? while(true){

          ?????????????????????? System.out.println("In put");

          ??????????????????????? b.put(count);

          ??????????????????????? System.out.println("Producer "+id+":"+count);

          ??????????????????????? count++;

          ?????????????????????? Thread.sleep(step);

          ??????????????????????? System.out.println("Out put");

          ?????????????????? }

          ?????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ????????? }

          ???? }

          ?

          ???? 消費(fèi)者:

          ???? public class Consumer extends Thread{

          ????????? private Buffer b;

          ????????? private int step;

          ????????? private int id;

          ????

          ????????? public Consumer(Buffer b,int step,int id){

          ????????????? this.b = b;

          ?????????????? this.step = step;

          ????????????? this.id = id;

          ????????? }

          ?????????

          ????????? public void run(){

          ????????????? try{

          ?????????????????? while(true){

          ??????????????????????? System.out.println("In get");

          ?????????????????????? System.out.println("\t\tConsume "+id+":"+b.get());

          ??????????????????????? System.out.println("Out get");

          ??????????????????????? Thread.sleep(step);

          ?????????????????? }

          ?????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }???

          ????????? }

          ???? }

          ?

          ???? 測(cè)試程序:

          ???? public class CPTest{

          ????????? public static void main(String[] args){

          ?????????????? Buffer b = new Buffer(3);

          ????????????? Consumer c1 = new Consumer(b,1000,1);

          ????????????? Consumer c2 = new Consumer(b,1000,2);

          ?????????????? Producer p1 = new Producer(b,100,1);

          ????????????? Producer p2 = new Producer(b,100,2);

          ????????

          ????????????? c1.start();

          ?????????????? c2.start();

          ????????????? p1.start();

          ????????????? p2.start();

          ????????? }

          ???? }

          ?

          5.????? CyclicBarrier

          ???? CyclicBarrier可以讓一組線程在某一個(gè)時(shí)間點(diǎn)上進(jìn)行等待,當(dāng)所有進(jìn)程都到達(dá)該等待點(diǎn)后,再繼續(xù)往下執(zhí)行。CyclicBarrier使用完以后,通過(guò)調(diào)用reset()方法,可以重用該CyclicBarrier。線程通過(guò)調(diào)用await()來(lái)減少計(jì)數(shù)。

          ?

          CyclicBarrier
          ?
          ?

          ?

          ?

          ?

          ?

          ?

          ?

          ???? 例子:

          ???? 任務(wù):

          ???? public class Task extends Thread{

          ????????? private String id;

          ????????? private CyclicBarrier c;

          ????????? private int time;

          ????

          ????????? public Task(CyclicBarrier c,String id,int time){

          ?????????????? this.c = c;

          ????????????? this.id = id;

          ?????????????? this.time = time;

          ????????? }

          ????

          ????????? public void run(){

          ?????????????? try{

          ?????????????????? System.out.println(id+" Start");

          ????????????????? Thread.sleep(time);

          ?????????????????? System.out.println(id+" Finish");

          ?????????????????? c.await();

          ?????????????????? System.out.println(id+" Exit");?????????

          ?????????????? }catch(Exception e){

          ?????????????????? e.printStackTrace();

          ????????????? }

          ????????? }???

          ???? }

          ?

          ???? 測(cè)試類(lèi):

          ???? public class Test{

          ????????? public static void main(String[] args){

          ????????????? CyclicBarrier c = new CyclicBarrier(3,new Runnable(){

          ?????????????????? public void run(){

          ??????????????????????? System.out.println("All Work Done");

          ?????????????????? }

          ????????????? });

          ?????????????? Task t1 = new Task(c,"1",1000);

          ????????????? Task t2 = new Task(c,"2",3000);

          ????????????? Task t3 = new Task(c,"3",5000);

          ?????????????? t1.start();

          ????????????? t2.start();

          ????????????? t3.start();???????

          ????????? }

          ???? }

          ?

          ???? 輸出結(jié)果:

          ???? 1 Start

          ???? 2 Start

          ???? 3 Start

          ???? 1 Finish

          ???? 2 Finish

          ???? 3 Finish

          ???? All Work Done

          ???? 3 Exit

          ???? 1 Exit

          ???? 2 Exit

          ?

          6.????? CountdownLatch

          ???? CountdownLatch具有與CyclicBarrier相似的功能,也能讓一組線程在某個(gè)點(diǎn)上進(jìn)行同步。但是與CyclicBarrier不同的是:1.CountdownLatch不能重用,2.線程在CountdownLatch上調(diào)用await()操作一定會(huì)被阻塞,直到計(jì)數(shù)值為0時(shí)才會(huì)被喚醒,而且計(jì)數(shù)值只能通過(guò)conutDown()方法進(jìn)行減少。

          特別的,當(dāng)CountdownLatch的值為1時(shí),該Latch被稱(chēng)為“啟動(dòng)大門(mén)”,所有任務(wù)線程都在該Latch上await(),直到某個(gè)非任務(wù)線程調(diào)用countDown()觸發(fā),所有任務(wù)線程開(kāi)始同時(shí)工作。

          ?

          7.????? Exchanger

          ???? Exchanger是一個(gè)類(lèi)似于計(jì)數(shù)值為2的CyclicBarrier。她允許兩個(gè)線程在某個(gè)點(diǎn)上進(jìn)行數(shù)據(jù)交換。

          ?????? 例子:

          ???? public class FillAndEmpty {

          ???????? Exchanger<DataBuffer> exchanger = new Exchanger();

          ???????? DataBuffer initialEmptyBuffer = ... a made-up type

          ???????? DataBuffer initialFullBuffer = ...

          ?

          ???????? public class FillingLoop implements Runnable {

          ????????????? public void run() {

          ?????????????????? DataBuffer currentBuffer = initialEmptyBuffer;

          ?????????????????? try {

          ?????????????????????? while (currentBuffer != null) {

          ??????????????????????????? addToBuffer(currentBuffer);

          ??????????????????????????? if (currentBuffer.full())

          ???????????????????????????????? currentBuffer = exchanger.exchange(currentBuffer);

          ?????????????????????? }

          ?????????????????? }catch(InterruptedException ex) { ... handle ... }

          ????????????? }

          ???????? }

          ?

          ???????? public class EmptyingLoop implements Runnable {

          ????????????? public void run() {

          ?????????????????? DataBuffer currentBuffer = initialFullBuffer;

          ?????????????????? try {

          ?????????????????????? while (currentBuffer != null) {

          ??????????????????????????? takeFromBuffer(currentBuffer);

          ??????????????????????????? if (currentBuffer.empty())

          ???????????????????????????????? currentBuffer = exchanger.exchange(currentBuffer);

          ?????????????????????? }

          ?????????????????? } catch (InterruptedException ex) { ... handle ...}

          ????????????? }

          ???????? }

          ?

          ???????? public void start() {

          ????????????? new Thread(new FillingLoop()).start();

          ????????????? new Thread(new EmptyingLoop()).start();

          ???????? }

          ???? }

          Exchange
          ?
          ?

          ?

          ????

          ?

          ?

          ?

          ?

          ?

          ?

          ?

          ?

          8.????? Lock,Condition

          ???? 鎖是最基本的同步原語(yǔ)。通過(guò)在鎖上面調(diào)用lock()和unlock()操作,可以達(dá)到與synchronized關(guān)鍵字相似的效果,但是有一點(diǎn)要注意的是,鎖必須顯式釋放,如果由于拋出異常,而沒(méi)有釋放鎖,將導(dǎo)致死鎖出現(xiàn)。Condition提供的await(),signal(),signal()操作,與原來(lái)的wai(),notify(),notifyAll()操作具有相似的含義。Lock的兩個(gè)主要子類(lèi)是ReentrantLock和ReadWriteLock。其中ReadWriteLock的作用是允許多人讀,而一人寫(xiě)。

          ???? 例子:

          ???? 使用Lock和Condition的生產(chǎn)者,消費(fèi)者問(wèn)題

          ???? public class BoundedBuffer {

          ???????? final Lock lock = new ReentrantLock();

          ???????? final Condition notFull? = lock.newCondition();

          ???????? final Condition notEmpty = lock.newCondition();

          ???????? final Object[] items = new Object[100];

          ???????? int putptr, takeptr, count;

          ????????

          ???????? public void put(Object x) throws InterruptedException {

          ????????????? lock.lock();

          ????????????? try {

          ?????????????????? while (count == items.length)

          ?????????????????????? notFull.await();

          ?????????????????? items[putptr] = x;

          ?????????????????? if (++putptr == items.length)

          ??????????????????????? putptr = 0;

          ?????????????????? ++count;

          ?????????????????? notEmpty.signal();

          ????????????? } finally {

          ?????????????????? lock.unlock();

          ?????????????? }

          ????????? }

          ????

          ????????? public Object take() throws InterruptedException {

          ?????????????? lock.lock();

          ????????????? try {

          ?????????????????? while (count == 0)

          ?????????????????????? notEmpty.await();

          ?????????????????? Object x = items[takeptr];

          ?????????????????? if (++takeptr == items.length)

          ??????????????????????? takeptr = 0;

          ?????????????????? --count;

          ?????????????????? notFull.signal();

          ?????????????????? return x;

          ?????????????? } finally {

          ?????????????????? lock.unlock();

          ????????????? }

          ????????? }

          ???? }???

          ?

          9.????? 小結(jié):新的concurrent包提供了一個(gè)從低到高的同步操作。

          ?

          posted on 2007-01-22 17:33 Lib 閱讀(1618) 評(píng)論(0)  編輯  收藏 所屬分類(lèi): Java
          <2025年8月>
          272829303112
          3456789
          10111213141516
          17181920212223
          24252627282930
          31123456



          我的JavaEye博客
          http://lib.javaeye.com


          常用鏈接

          留言簿(2)

          隨筆分類(lèi)

          文章分類(lèi)

          FLASH

          Java

          搜索

          •  

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          主站蜘蛛池模板: 南靖县| 乌拉特中旗| 环江| 茌平县| 新昌县| 喀什市| 宜春市| 六枝特区| 闽清县| 中江县| 雷州市| 成安县| 阳高县| 库车县| 维西| 虎林市| 德州市| 鄱阳县| 靖宇县| 绥滨县| 阜平县| 吉木乃县| 应用必备| 桑日县| 老河口市| 襄樊市| 泗阳县| 搜索| 内黄县| 务川| 鲁山县| 慈溪市| 伊吾县| 商丘市| 隆林| 和林格尔县| 昭觉县| 普兰县| 衡水市| 磐石市| 根河市|