keep moving!

          We must not cease from exploration. And the end of all our exploring will be to arrive where we began and to know the place for the first time.
          隨筆 - 37, 文章 - 2, 評論 - 3, 引用 - 0
          數(shù)據(jù)加載中……

          多線程 — Concurrent包簡介

          多線程 — Concurrent包簡介

          Util.concurrent工具包概述
          Doug Lea
          State University of New York at Oswego
          dl@cs.oswego.edu

          http://gee.cs.oswego.edu

          翻譯:

          Cocia Lin(cocia@163.com)

          Huihoo.org

          原文

          http://gee.cs.oswego.edu/dl/cpjslides/util.pdf

          要點
          –目標和結(jié)構(gòu)

          –主要的接口和實現(xiàn)

          Sync:獲得/釋放(acquire/release) 協(xié)議

          Channel:放置/取走(put/take) 協(xié)議

          Executor:執(zhí)行Runnable任務(wù)

          –每一個部分都有一些關(guān)聯(lián)的接口和支持類

          –簡單的涉及其他的類和特性

          目標
          –一些簡單的接口

          -但是覆蓋大部分程序員需要小心處理代碼的問題

          – 高質(zhì)量實現(xiàn)

          -正確的,保守的,有效率的,可移植的

          –可能作為將來標準的基礎(chǔ)

          -獲取經(jīng)驗和收集反饋信息

          Sync
          – acquire/release協(xié)議的主要接口

          -用來定制鎖,資源管理,其他的同步用途

          - 高層抽象接口

          - 沒有區(qū)分不同的加鎖用法

          –實現(xiàn)

          -Mutex, ReentrantLock, Latch, CountDown,Semaphore, WaiterPreferenceSemaphore,FIFOSemaphore, PrioritySemaphore

          n 還有,有幾個簡單的實現(xiàn),例如ObservableSync, LayeredSync

          獨占鎖
          try {

          lock.acquire();

          try {

          action();

          }

          finally {

          lock.release();

          }

          }

          catch (InterruptedException ie) { ... }

          – Java同步塊不適用的時候使用它

          - 超時,回退(back-off)

          - 確保可中斷

          - 大量迅速鎖定

          - 創(chuàng)建Posix風格應用(condvar)

          獨占例子
          class ParticleUsingMutex {

          int x; int y;

          final Random rng = new Random();

          final Mutex mutex = new Mutex();

          public void move() {

          try {

          mutex.acquire();

          try { x += rng.nextInt(2)-1; y += rng.nextInt(2)-1; }

          finally { mutex.release(); }

          }

          catch (InterruptedException ie) {

          Thread.currentThread().interrupt(); }

          }

          public void draw(Graphics g) {

          int lx, ly;

          try {

          mutex.acquire();

          try { lx = x; ly = y; }

          finally { mutex.release(); }

          }

          catch (InterruptedException ie) {

          Thread.currentThread().interrupt(); return; }

          g.drawRect(lx, ly, 10, 10);

          }

          }

          回退(Backoff)例子
          class CellUsingBackoff {

          private long val;

          private final Mutex mutex = new Mutex();

          void swapVal(CellUsingBackoff other)

          throws InterruptedException {

          if (this == other) return; // alias check

          for (;;) {

          mutex.acquire();

          try {

          I f (other.mutex.attempt(0)) {

          try {

          long t = val;

          val = other.val;

          other.val = t;

          return;

          }

          finally { other.mutex.release(); }

          }

          }

          finally { mutex.release(); };

          Thread.sleep(100); // heuristic retry interval

          }

          }

          }

          讀寫鎖
          interface ReadWriteLock {

          Sync readLock();

          Sync writeLock();

          }

          – 管理一對鎖

          - 和普通的鎖一樣的使用習慣

          – 對集合類很有用

          -半自動的方式實現(xiàn)SyncSet, SyncMap, …

          – 實現(xiàn)者使用不同的鎖策略

          - WriterPreference, ReentrantWriterPreference,

          ReaderPreference, FIFO

          ReadWriteLock例子
          – 示范在讀寫鎖中執(zhí)行任何Runnable的包裝類

          class WithRWLock {

          final ReadWriteLock rw;

          public WithRWLock(ReadWriteLock l) { rw = l; }

          public void performRead(Runnable readCommand)

          throws InterruptedException {

          rw.readLock().acquire();

          try { readCommand.run(); }

          finally { rw.readlock().release(); }

          }

          public void performWrite(…) // similar

          }

          閉鎖(Latch)
          – 閉鎖是開始時設(shè)置為false,但一旦被設(shè)置為true,他將永遠保持true狀態(tài)

          - 初始化標志

          - 流結(jié)束定位

          - 線程中斷

          - 事件出發(fā)指示器

          – CountDown和他有點類似,不同的是,CountDown需要一定數(shù)量的觸發(fā)設(shè)置,而不是一次

          – 非常簡單,但是廣泛使用的類

          - 替換容易犯錯的開發(fā)代碼

          Latch Example 閉鎖例子
          class Worker implements Runnable {

          Latch startSignal;

          Worker(Latch l) { startSignal = l; }

          public void run() {

          startSignal.acquire();

          // … doWork();

          }

          }

          class Driver { // …

          void main() {

          Latch ss = new Latch();

          for (int i = 0; i < N; ++i) // make threads

          new Thread(new Worker(ss)).start();

          doSomethingElse(); // don’t let run yet

          ss.release(); // now let all threads proceed

          }

          }

          信號(Semaphores)
          -- 服務(wù)于數(shù)量有限的占有者

          - 使用許可數(shù)量構(gòu)造對象(通常是0)

          - 如果需要一個許可才能獲取,等待,然后取走一個許可

          - 釋放的時候?qū)⒃S可添加回來

          -- 但是真正的許可并沒有轉(zhuǎn)移(But no actual permits change hands.)

          - 信號量僅僅保留當前的計數(shù)值

          -- 應用程序

          - 鎖:一個信號量可以被用作互斥體(mutex)

          - 一個獨立的等待緩存或者資源控制的操作

          - 設(shè)計系統(tǒng)是想忽略底層的系統(tǒng)信號

          -- (phores ‘remember’ past signals)記住已經(jīng)消失的信號量

          信號量例子
          class Pool {

          ArrayList items = new ArrayList();

          HashSet busy = new HashSet();

          final Semaphore available;

          public Pool(int n) {

          available = new Semaphore(n);

          // … somehow initialize n items …;

          }

          public Object getItem() throws InterruptedException {

          available.acquire();

          return doGet();

          }

          public void returnItem(Object x) {

          if (doReturn(x)) available.release();

          }

          synchronized Object doGet() {

          Object x = items.remove(items.size()-1);

          busy.add(x); // put in set to check returns

          return x;

          }

          synchronized boolean doReturn(Object x) {

          return busy.remove(x); // true if was present

          }

          }

          屏障(Barrier)
          – 多部分同步接口

          - 每一部分都必須等待其他的分不撞倒屏障

          – CyclicBarrier類

          - CountDown的一個可以重新設(shè)置的版本

          - 對于反復劃分算法很有用(iterative partitioning algorithms)

          – Rendezvous類

          - 一個每部分都能夠和其他部分交換信息的屏障

          - 行為類似同時的在一個同步通道上put和take

          - 對于資源交換協(xié)議很有用(resource-exchange protocols)

          通道(Channel)
          –為緩沖,隊列等服務(wù)的主接口

          – 具體實現(xiàn)

          - LinkedQueue, BoundedLinkedQueue,BoundedBuffer, BoundedPriorityQueue,SynchronousChannel, Slot

          通道屬性
          – 被定義為Puttable和Takable的子接口

          - 允許安裝生產(chǎn)者/消費者模式執(zhí)行

          – 支持可超時的操作offer和poll

          - 當超時值是0時,可能會被阻塞

          - 所有的方法能夠拋出InterruptedException異常

          – 沒有接口需要size方法

          - 但是一些實現(xiàn)定義了這個方法

          - BoundedChannel有capacity方法

          通道例子
          class Service { // …

          final Channel msgQ = new LinkedQueue();

          public void serve() throws InterruptedException {

          String status = doService();

          msgQ.put(status);

          }

          public Service() { // start background thread

          Runnable logger = new Runnable() {

          public void run() {

          try {

          for(;;)

          System.out.println(msqQ.take());

          }

          catch(InterruptedException ie) {} }

          };

          new Thread(logger).start();

          }

          }

          運行器(Executor)
          – 類似線程的類的主接口

          - 線程池

          - 輕量級運行框架

          - 可以定制調(diào)度算法

          – 只需要支持execute(Runnable r)

          - 同Thread.start類似

          – 實現(xiàn)

          - PooledExecutor, ThreadedExecutor,QueuedExecutor, FJTaskRunnerGroup

          - 相關(guān)的ThreadFactory類允許大多數(shù)的運行器通過定制屬性使用線程

          PooledExecutor
          – 一個可調(diào)的工作者線程池,可修改得屬性如下:

          - 任務(wù)隊列的類型

          - 最大線程數(shù)

          - 最小線程數(shù)

          - 預熱(預分配)和立即(分配)線程

          - 保持活躍直到工作線程結(jié)束

          – 以后如果需要可能被一個新的代替

          - 飽和(Saturation)協(xié)議

          – 阻塞,丟棄,生產(chǎn)者運行,等等

          PooledExecutor例子
          class WebService {

          public static void main(String[] args) {

          PooledExecutor pool =

          new PooledExecutor(new BoundedBuffer(10), 20);

          pool.createThreads(4);

          try {

          ServerSocket socket = new ServerSocket(9999);

          for (;;) {

          final Socket connection = socket.accept();

          pool.execute(new Runnable() {

          public void run() {

          new Handler().process(connection);

          }});

          }

          }

          catch(Exception e) { } // die

          }

          }

          class Handler { void process(Socket s); }

          前景(Future)和可調(diào)用(Callable)
          – Callabe是類似于Runnable的接口,用來作為參數(shù)和傳遞結(jié)果

          interface Callable {

          Object call(Object arg) throws Exception;

          }

          – FutureResult管理Callable的異步執(zhí)行

          class FutureResult { // …

          // block caller until result is ready

          public Object get()

          throws InterruptedException, InvocationTargetException;

          public void set(Object result); // unblocks get

          // create Runnable that can be used with an Executor

          public Runnable setter(Callable function);

          }

          FutureResult例子
          class ImageRenderer { Image render(byte[] raw); }

          class App { // …

          Executor executor = …; // any executor

          ImageRenderer renderer = new ImageRenderer();

          public void display(byte[] rawimage) {

          try {

          FutureResult futureImage = new FutureResult();

          Runnable cmd = futureImage.setter(new Callable(){

          public Object call() {

          return renderer.render(rawImage);

          }});

          executor.execute(cmd);

          drawBorders(); // do other things while executing

          drawCaption();

          drawImage((Image)(futureImage.get())); // use future

          }

          catch (Exception ex) {

          cleanup();

          return;

          }

          }

          }

          其他的類
          – CopyOnWriteArrayList

          - 支持整個集合復制時每一個修改的無鎖訪問

          - 適合大多數(shù)的多路廣播應用程序

          – 工具包還包括了一個java.beans多路廣播類的COW版本

          – SynchronizedDouble, SynchronizedInt,SynchronizedRef, etc

          - 類似于java.lang.Double,提供可變操作的同步版本.例如,addTo,inc

          - 添加了一些象swap,commit這樣的實用操作

          未來計劃
          – 并發(fā)數(shù)據(jù)構(gòu)架

          - 一組繁重線程連接環(huán)境下有用的工具集合

          –支持側(cè)重I/O的程序

          - 事件機制的IO系統(tǒng)

          – 小版本的實現(xiàn)

          - 例如SingleSourceQueue

          –小幅度的改善

          - 使運行器更容易使用

          – 替換

          - JDK1.3 java.util.Timer 被ClockDaemon取代



          張金鵬 2007-01-31 13:08 發(fā)表評論

          posted on 2008-09-07 11:10 大石頭 閱讀(463) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導航:
           
          主站蜘蛛池模板: 泉州市| 沙河市| 安义县| 临安市| 农安县| 册亨县| 正定县| 梁河县| 鹤壁市| 丁青县| 鸡泽县| 海伦市| 绍兴市| 新蔡县| 峨眉山市| 海淀区| 乌兰察布市| 交城县| 鄂尔多斯市| 沙洋县| 长兴县| 沛县| 大石桥市| 花莲县| 天水市| 攀枝花市| 安顺市| 海林市| 策勒县| 新田县| 奉化市| 麻江县| 乐平市| 赫章县| 应用必备| 团风县| 革吉县| 鄂温| 永泰县| 宜丰县| 卢氏县|