細心!用心!耐心!

          吾非文人,乃市井一俗人也,讀百卷書,跨江河千里,故申城一游; 一兩滴辛酸,三四年學業,五六點粗墨,七八筆買賣,九十道人情。

          BlogJava 聯系 聚合 管理
            1 Posts :: 196 Stories :: 10 Comments :: 0 Trackbacks

          多線程 — Concurrent包簡介

          Util.concurrent工具包概述
          Doug Lea
          State University of New York at Oswego
          dl@cs.oswego.edu

          http://gee.cs.oswego.edu

          翻譯:

          Cocia Lin(cocia@163.com)

          Huihoo.org

          原文

          http://gee.cs.oswego.edu/dl/cpjslides/util.pdf

          要點
          –目標和結構

          –主要的接口和實現

          Sync:獲得/釋放(acquire/release) 協議

          Channel:放置/取走(put/take) 協議

          Executor:執行Runnable任務

          –每一個部分都有一些關聯的接口和支持類

          –簡單的涉及其他的類和特性

          目標
          –一些簡單的接口

          -但是覆蓋大部分程序員需要小心處理代碼的問題

          – 高質量實現

          -正確的,保守的,有效率的,可移植的

          –可能作為將來標準的基礎

          -獲取經驗和收集反饋信息

          Sync
          – acquire/release協議的主要接口

          -用來定制鎖,資源管理,其他的同步用途

          - 高層抽象接口

          - 沒有區分不同的加鎖用法

          –實現

          -Mutex, ReentrantLock, Latch, CountDown,Semaphore, WaiterPreferenceSemaphore,FIFOSemaphore, PrioritySemaphore

          n 還有,有幾個簡單的實現,例如ObservableSync, LayeredSync

          獨占鎖
          try {

          lock.acquire();

          try {

          action();

          }

          finally {

          lock.release();

          }

          }

          catch (InterruptedException ie) { ... }

          – Java同步塊不適用的時候使用它

          - 超時,回退(back-off)

          - 確保可中斷

          - 大量迅速鎖定

          - 創建Posix風格應用(condvar)

          獨占例子
          class ParticleUsingMutex {

          int x; int y;

          final Random rng = new Random();

          final Mutex mutex = new Mutex();

          public void move() {

          try {

          mutex.acquire();

          try { x += rng.nextInt(2)-1; y += rng.nextInt(2)-1; }

          finally { mutex.release(); }

          }

          catch (InterruptedException ie) {

          Thread.currentThread().interrupt(); }

          }

          public void draw(Graphics g) {

          int lx, ly;

          try {

          mutex.acquire();

          try { lx = x; ly = y; }

          finally { mutex.release(); }

          }

          catch (InterruptedException ie) {

          Thread.currentThread().interrupt(); return; }

          g.drawRect(lx, ly, 10, 10);

          }

          }

          回退(Backoff)例子
          class CellUsingBackoff {

          private long val;

          private final Mutex mutex = new Mutex();

          void swapVal(CellUsingBackoff other)

          throws InterruptedException {

          if (this == other) return; // alias check

          for (;;) {

          mutex.acquire();

          try {

          I f (other.mutex.attempt(0)) {

          try {

          long t = val;

          val = other.val;

          other.val = t;

          return;

          }

          finally { other.mutex.release(); }

          }

          }

          finally { mutex.release(); };

          Thread.sleep(100); // heuristic retry interval

          }

          }

          }

          讀寫鎖
          interface ReadWriteLock {

          Sync readLock();

          Sync writeLock();

          }

          – 管理一對鎖

          - 和普通的鎖一樣的使用習慣

          – 對集合類很有用

          -半自動的方式實現SyncSet, SyncMap, …

          – 實現者使用不同的鎖策略

          - WriterPreference, ReentrantWriterPreference,

          ReaderPreference, FIFO

          ReadWriteLock例子
          – 示范在讀寫鎖中執行任何Runnable的包裝類

          class WithRWLock {

          final ReadWriteLock rw;

          public WithRWLock(ReadWriteLock l) { rw = l; }

          public void performRead(Runnable readCommand)

          throws InterruptedException {

          rw.readLock().acquire();

          try { readCommand.run(); }

          finally { rw.readlock().release(); }

          }

          public void performWrite(…) // similar

          }

          閉鎖(Latch)
          – 閉鎖是開始時設置為false,但一旦被設置為true,他將永遠保持true狀態

          - 初始化標志

          - 流結束定位

          - 線程中斷

          - 事件出發指示器

          – CountDown和他有點類似,不同的是,CountDown需要一定數量的觸發設置,而不是一次

          – 非常簡單,但是廣泛使用的類

          - 替換容易犯錯的開發代碼

          Latch Example 閉鎖例子
          class Worker implements Runnable {

          Latch startSignal;

          Worker(Latch l) { startSignal = l; }

          public void run() {

          startSignal.acquire();

          // … doWork();

          }

          }

          class Driver { // …

          void main() {

          Latch ss = new Latch();

          for (int i = 0; i < N; ++i) // make threads

          new Thread(new Worker(ss)).start();

          doSomethingElse(); // don’t let run yet

          ss.release(); // now let all threads proceed

          }

          }

          信號(Semaphores)
          -- 服務于數量有限的占有者

          - 使用許可數量構造對象(通常是0)

          - 如果需要一個許可才能獲取,等待,然后取走一個許可

          - 釋放的時候將許可添加回來

          -- 但是真正的許可并沒有轉移(But no actual permits change hands.)

          - 信號量僅僅保留當前的計數值

          -- 應用程序

          - 鎖:一個信號量可以被用作互斥體(mutex)

          - 一個獨立的等待緩存或者資源控制的操作

          - 設計系統是想忽略底層的系統信號

          -- (phores ‘remember’ past signals)記住已經消失的信號量

          信號量例子
          class Pool {

          ArrayList items = new ArrayList();

          HashSet busy = new HashSet();

          final Semaphore available;

          public Pool(int n) {

          available = new Semaphore(n);

          // … somehow initialize n items …;

          }

          public Object getItem() throws InterruptedException {

          available.acquire();

          return doGet();

          }

          public void returnItem(Object x) {

          if (doReturn(x)) available.release();

          }

          synchronized Object doGet() {

          Object x = items.remove(items.size()-1);

          busy.add(x); // put in set to check returns

          return x;

          }

          synchronized boolean doReturn(Object x) {

          return busy.remove(x); // true if was present

          }

          }

          屏障(Barrier)
          – 多部分同步接口

          - 每一部分都必須等待其他的分不撞倒屏障

          – CyclicBarrier類

          - CountDown的一個可以重新設置的版本

          - 對于反復劃分算法很有用(iterative partitioning algorithms)

          – Rendezvous類

          - 一個每部分都能夠和其他部分交換信息的屏障

          - 行為類似同時的在一個同步通道上put和take

          - 對于資源交換協議很有用(resource-exchange protocols)

          通道(Channel)
          –為緩沖,隊列等服務的主接口

          – 具體實現

          - LinkedQueue, BoundedLinkedQueue,BoundedBuffer, BoundedPriorityQueue,SynchronousChannel, Slot

          通道屬性
          – 被定義為Puttable和Takable的子接口

          - 允許安裝生產者/消費者模式執行

          – 支持可超時的操作offer和poll

          - 當超時值是0時,可能會被阻塞

          - 所有的方法能夠拋出InterruptedException異常

          – 沒有接口需要size方法

          - 但是一些實現定義了這個方法

          - BoundedChannel有capacity方法

          通道例子
          class Service { // …

          final Channel msgQ = new LinkedQueue();

          public void serve() throws InterruptedException {

          String status = doService();

          msgQ.put(status);

          }

          public Service() { // start background thread

          Runnable logger = new Runnable() {

          public void run() {

          try {

          for(;;)

          System.out.println(msqQ.take());

          }

          catch(InterruptedException ie) {} }

          };

          new Thread(logger).start();

          }

          }

          運行器(Executor)
          – 類似線程的類的主接口

          - 線程池

          - 輕量級運行框架

          - 可以定制調度算法

          – 只需要支持execute(Runnable r)

          - 同Thread.start類似

          – 實現

          - PooledExecutor, ThreadedExecutor,QueuedExecutor, FJTaskRunnerGroup

          - 相關的ThreadFactory類允許大多數的運行器通過定制屬性使用線程

          PooledExecutor
          – 一個可調的工作者線程池,可修改得屬性如下:

          - 任務隊列的類型

          - 最大線程數

          - 最小線程數

          - 預熱(預分配)和立即(分配)線程

          - 保持活躍直到工作線程結束

          – 以后如果需要可能被一個新的代替

          - 飽和(Saturation)協議

          – 阻塞,丟棄,生產者運行,等等

          PooledExecutor例子
          class WebService {

          public static void main(String[] args) {

          PooledExecutor pool =

          new PooledExecutor(new BoundedBuffer(10), 20);

          pool.createThreads(4);

          try {

          ServerSocket socket = new ServerSocket(9999);

          for (;;) {

          final Socket connection = socket.accept();

          pool.execute(new Runnable() {

          public void run() {

          new Handler().process(connection);

          }});

          }

          }

          catch(Exception e) { } // die

          }

          }

          class Handler { void process(Socket s); }

          前景(Future)和可調用(Callable)
          – Callabe是類似于Runnable的接口,用來作為參數和傳遞結果

          interface Callable {

          Object call(Object arg) throws Exception;

          }

          – FutureResult管理Callable的異步執行

          class FutureResult { // …

          // block caller until result is ready

          public Object get()

          throws InterruptedException, InvocationTargetException;

          public void set(Object result); // unblocks get

          // create Runnable that can be used with an Executor

          public Runnable setter(Callable function);

          }

          FutureResult例子
          class ImageRenderer { Image render(byte[] raw); }

          class App { // …

          Executor executor = …; // any executor

          ImageRenderer renderer = new ImageRenderer();

          public void display(byte[] rawimage) {

          try {

          FutureResult futureImage = new FutureResult();

          Runnable cmd = futureImage.setter(new Callable(){

          public Object call() {

          return renderer.render(rawImage);

          }});

          executor.execute(cmd);

          drawBorders(); // do other things while executing

          drawCaption();

          drawImage((Image)(futureImage.get())); // use future

          }

          catch (Exception ex) {

          cleanup();

          return;

          }

          }

          }

          其他的類
          – CopyOnWriteArrayList

          - 支持整個集合復制時每一個修改的無鎖訪問

          - 適合大多數的多路廣播應用程序

          – 工具包還包括了一個java.beans多路廣播類的COW版本

          – SynchronizedDouble, SynchronizedInt,SynchronizedRef, etc

          - 類似于java.lang.Double,提供可變操作的同步版本.例如,addTo,inc

          - 添加了一些象swap,commit這樣的實用操作

          未來計劃
          – 并發數據構架

          - 一組繁重線程連接環境下有用的工具集合

          –支持側重I/O的程序

          - 事件機制的IO系統

          – 小版本的實現

          - 例如SingleSourceQueue

          –小幅度的改善

          - 使運行器更容易使用

          – 替換

          - JDK1.3 java.util.Timer 被ClockDaemon取代

          posted on 2007-01-31 13:08 張金鵬 閱讀(1181) 評論(0)  編輯  收藏 所屬分類: 多線程編程
          主站蜘蛛池模板: 手游| 即墨市| 万年县| 景洪市| 石渠县| 瓦房店市| 永登县| 贡嘎县| 繁峙县| 长岛县| 禄丰县| 伊宁县| 涿州市| 黑龙江省| 庆云县| 垦利县| 来凤县| 雅安市| 龙川县| 宜城市| 萍乡市| 盐城市| 房产| 江达县| 岳阳市| 漠河县| 会宁县| 家居| 新丰县| 卢龙县| 浦北县| 宝清县| 溆浦县| 新巴尔虎右旗| 大城县| 乐清市| 冕宁县| 亚东县| 东城区| 镇赉县| 东宁县|