keep moving!

          We must not cease from exploration. And the end of all our exploring will be to arrive where we began and to know the place for the first time.
          隨筆 - 37, 文章 - 2, 評論 - 3, 引用 - 0
          數據加載中……

          多線程 — Concurrent包簡介

          多線程 — Concurrent包簡介

          Util.concurrent工具包概述
          Doug Lea
          State University of New York at Oswego
          dl@cs.oswego.edu

          http://gee.cs.oswego.edu

          翻譯:

          Cocia Lin(cocia@163.com)

          Huihoo.org

          原文

          http://gee.cs.oswego.edu/dl/cpjslides/util.pdf

          要點
          –目標和結構

          –主要的接口和實現

          Sync:獲得/釋放(acquire/release) 協議

          Channel:放置/取走(put/take) 協議

          Executor:執行Runnable任務

          –每一個部分都有一些關聯的接口和支持類

          –簡單的涉及其他的類和特性

          目標
          –一些簡單的接口

          -但是覆蓋大部分程序員需要小心處理代碼的問題

          – 高質量實現

          -正確的,保守的,有效率的,可移植的

          –可能作為將來標準的基礎

          -獲取經驗和收集反饋信息

          Sync
          – acquire/release協議的主要接口

          -用來定制鎖,資源管理,其他的同步用途

          - 高層抽象接口

          - 沒有區分不同的加鎖用法

          –實現

          -Mutex, ReentrantLock, Latch, CountDown,Semaphore, WaiterPreferenceSemaphore,FIFOSemaphore, PrioritySemaphore

          n 還有,有幾個簡單的實現,例如ObservableSync, LayeredSync

          獨占鎖
          try {

          lock.acquire();

          try {

          action();

          }

          finally {

          lock.release();

          }

          }

          catch (InterruptedException ie) { ... }

          – Java同步塊不適用的時候使用它

          - 超時,回退(back-off)

          - 確保可中斷

          - 大量迅速鎖定

          - 創建Posix風格應用(condvar)

          獨占例子
          class ParticleUsingMutex {

          int x; int y;

          final Random rng = new Random();

          final Mutex mutex = new Mutex();

          public void move() {

          try {

          mutex.acquire();

          try { x += rng.nextInt(2)-1; y += rng.nextInt(2)-1; }

          finally { mutex.release(); }

          }

          catch (InterruptedException ie) {

          Thread.currentThread().interrupt(); }

          }

          public void draw(Graphics g) {

          int lx, ly;

          try {

          mutex.acquire();

          try { lx = x; ly = y; }

          finally { mutex.release(); }

          }

          catch (InterruptedException ie) {

          Thread.currentThread().interrupt(); return; }

          g.drawRect(lx, ly, 10, 10);

          }

          }

          回退(Backoff)例子
          class CellUsingBackoff {

          private long val;

          private final Mutex mutex = new Mutex();

          void swapVal(CellUsingBackoff other)

          throws InterruptedException {

          if (this == other) return; // alias check

          for (;;) {

          mutex.acquire();

          try {

          I f (other.mutex.attempt(0)) {

          try {

          long t = val;

          val = other.val;

          other.val = t;

          return;

          }

          finally { other.mutex.release(); }

          }

          }

          finally { mutex.release(); };

          Thread.sleep(100); // heuristic retry interval

          }

          }

          }

          讀寫鎖
          interface ReadWriteLock {

          Sync readLock();

          Sync writeLock();

          }

          – 管理一對鎖

          - 和普通的鎖一樣的使用習慣

          – 對集合類很有用

          -半自動的方式實現SyncSet, SyncMap, …

          – 實現者使用不同的鎖策略

          - WriterPreference, ReentrantWriterPreference,

          ReaderPreference, FIFO

          ReadWriteLock例子
          – 示范在讀寫鎖中執行任何Runnable的包裝類

          class WithRWLock {

          final ReadWriteLock rw;

          public WithRWLock(ReadWriteLock l) { rw = l; }

          public void performRead(Runnable readCommand)

          throws InterruptedException {

          rw.readLock().acquire();

          try { readCommand.run(); }

          finally { rw.readlock().release(); }

          }

          public void performWrite(…) // similar

          }

          閉鎖(Latch)
          – 閉鎖是開始時設置為false,但一旦被設置為true,他將永遠保持true狀態

          - 初始化標志

          - 流結束定位

          - 線程中斷

          - 事件出發指示器

          – CountDown和他有點類似,不同的是,CountDown需要一定數量的觸發設置,而不是一次

          – 非常簡單,但是廣泛使用的類

          - 替換容易犯錯的開發代碼

          Latch Example 閉鎖例子
          class Worker implements Runnable {

          Latch startSignal;

          Worker(Latch l) { startSignal = l; }

          public void run() {

          startSignal.acquire();

          // … doWork();

          }

          }

          class Driver { // …

          void main() {

          Latch ss = new Latch();

          for (int i = 0; i < N; ++i) // make threads

          new Thread(new Worker(ss)).start();

          doSomethingElse(); // don’t let run yet

          ss.release(); // now let all threads proceed

          }

          }

          信號(Semaphores)
          -- 服務于數量有限的占有者

          - 使用許可數量構造對象(通常是0)

          - 如果需要一個許可才能獲取,等待,然后取走一個許可

          - 釋放的時候將許可添加回來

          -- 但是真正的許可并沒有轉移(But no actual permits change hands.)

          - 信號量僅僅保留當前的計數值

          -- 應用程序

          - 鎖:一個信號量可以被用作互斥體(mutex)

          - 一個獨立的等待緩存或者資源控制的操作

          - 設計系統是想忽略底層的系統信號

          -- (phores ‘remember’ past signals)記住已經消失的信號量

          信號量例子
          class Pool {

          ArrayList items = new ArrayList();

          HashSet busy = new HashSet();

          final Semaphore available;

          public Pool(int n) {

          available = new Semaphore(n);

          // … somehow initialize n items …;

          }

          public Object getItem() throws InterruptedException {

          available.acquire();

          return doGet();

          }

          public void returnItem(Object x) {

          if (doReturn(x)) available.release();

          }

          synchronized Object doGet() {

          Object x = items.remove(items.size()-1);

          busy.add(x); // put in set to check returns

          return x;

          }

          synchronized boolean doReturn(Object x) {

          return busy.remove(x); // true if was present

          }

          }

          屏障(Barrier)
          – 多部分同步接口

          - 每一部分都必須等待其他的分不撞倒屏障

          – CyclicBarrier類

          - CountDown的一個可以重新設置的版本

          - 對于反復劃分算法很有用(iterative partitioning algorithms)

          – Rendezvous類

          - 一個每部分都能夠和其他部分交換信息的屏障

          - 行為類似同時的在一個同步通道上put和take

          - 對于資源交換協議很有用(resource-exchange protocols)

          通道(Channel)
          –為緩沖,隊列等服務的主接口

          – 具體實現

          - LinkedQueue, BoundedLinkedQueue,BoundedBuffer, BoundedPriorityQueue,SynchronousChannel, Slot

          通道屬性
          – 被定義為Puttable和Takable的子接口

          - 允許安裝生產者/消費者模式執行

          – 支持可超時的操作offer和poll

          - 當超時值是0時,可能會被阻塞

          - 所有的方法能夠拋出InterruptedException異常

          – 沒有接口需要size方法

          - 但是一些實現定義了這個方法

          - BoundedChannel有capacity方法

          通道例子
          class Service { // …

          final Channel msgQ = new LinkedQueue();

          public void serve() throws InterruptedException {

          String status = doService();

          msgQ.put(status);

          }

          public Service() { // start background thread

          Runnable logger = new Runnable() {

          public void run() {

          try {

          for(;;)

          System.out.println(msqQ.take());

          }

          catch(InterruptedException ie) {} }

          };

          new Thread(logger).start();

          }

          }

          運行器(Executor)
          – 類似線程的類的主接口

          - 線程池

          - 輕量級運行框架

          - 可以定制調度算法

          – 只需要支持execute(Runnable r)

          - 同Thread.start類似

          – 實現

          - PooledExecutor, ThreadedExecutor,QueuedExecutor, FJTaskRunnerGroup

          - 相關的ThreadFactory類允許大多數的運行器通過定制屬性使用線程

          PooledExecutor
          – 一個可調的工作者線程池,可修改得屬性如下:

          - 任務隊列的類型

          - 最大線程數

          - 最小線程數

          - 預熱(預分配)和立即(分配)線程

          - 保持活躍直到工作線程結束

          – 以后如果需要可能被一個新的代替

          - 飽和(Saturation)協議

          – 阻塞,丟棄,生產者運行,等等

          PooledExecutor例子
          class WebService {

          public static void main(String[] args) {

          PooledExecutor pool =

          new PooledExecutor(new BoundedBuffer(10), 20);

          pool.createThreads(4);

          try {

          ServerSocket socket = new ServerSocket(9999);

          for (;;) {

          final Socket connection = socket.accept();

          pool.execute(new Runnable() {

          public void run() {

          new Handler().process(connection);

          }});

          }

          }

          catch(Exception e) { } // die

          }

          }

          class Handler { void process(Socket s); }

          前景(Future)和可調用(Callable)
          – Callabe是類似于Runnable的接口,用來作為參數和傳遞結果

          interface Callable {

          Object call(Object arg) throws Exception;

          }

          – FutureResult管理Callable的異步執行

          class FutureResult { // …

          // block caller until result is ready

          public Object get()

          throws InterruptedException, InvocationTargetException;

          public void set(Object result); // unblocks get

          // create Runnable that can be used with an Executor

          public Runnable setter(Callable function);

          }

          FutureResult例子
          class ImageRenderer { Image render(byte[] raw); }

          class App { // …

          Executor executor = …; // any executor

          ImageRenderer renderer = new ImageRenderer();

          public void display(byte[] rawimage) {

          try {

          FutureResult futureImage = new FutureResult();

          Runnable cmd = futureImage.setter(new Callable(){

          public Object call() {

          return renderer.render(rawImage);

          }});

          executor.execute(cmd);

          drawBorders(); // do other things while executing

          drawCaption();

          drawImage((Image)(futureImage.get())); // use future

          }

          catch (Exception ex) {

          cleanup();

          return;

          }

          }

          }

          其他的類
          – CopyOnWriteArrayList

          - 支持整個集合復制時每一個修改的無鎖訪問

          - 適合大多數的多路廣播應用程序

          – 工具包還包括了一個java.beans多路廣播類的COW版本

          – SynchronizedDouble, SynchronizedInt,SynchronizedRef, etc

          - 類似于java.lang.Double,提供可變操作的同步版本.例如,addTo,inc

          - 添加了一些象swap,commit這樣的實用操作

          未來計劃
          – 并發數據構架

          - 一組繁重線程連接環境下有用的工具集合

          –支持側重I/O的程序

          - 事件機制的IO系統

          – 小版本的實現

          - 例如SingleSourceQueue

          –小幅度的改善

          - 使運行器更容易使用

          – 替換

          - JDK1.3 java.util.Timer 被ClockDaemon取代



          張金鵬 2007-01-31 13:08 發表評論

          posted on 2008-09-07 11:10 大石頭 閱讀(463) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 南汇区| 许昌县| 德保县| 松原市| 微山县| 扶风县| 陆丰市| 宿迁市| 双柏县| 樟树市| 巧家县| 威海市| 邛崃市| 闽侯县| 南木林县| 时尚| 抚远县| 合山市| 新蔡县| 高平市| 尼勒克县| 普陀区| 涡阳县| 明光市| 中阳县| 齐齐哈尔市| 肥西县| 岳西县| 娄底市| 建湖县| 宁波市| 体育| 海晏县| 云浮市| 灌南县| 蓬安县| 如东县| 临桂县| 抚顺县| 咸宁市| 唐海县|