BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

          java多線程設計模式詳解之三

          Posted on 2007-11-09 01:38 dybjsun 閱讀(194) 評論(0)  編輯  收藏 所屬分類: 多線程主題
            前面談了多線程應用程序能極大地改善用戶相應。例如對于一個Web應用程序,每當一個用戶請求服務器連接時,服務器就可以啟動一個新線程為用戶服務。

            然而,創建和銷毀線程本身就有一定的開銷,內存開銷就不可忽略,垃圾收集器還必須負擔更多的工作。因此,線程池就是為了避免頻繁創建和銷毀線程。

            每當服務器接受了一個新的請求后,服務器就從線程池中挑選一個等待的線程并執行請求處理。處理完畢后,線程并不結束,而是轉為阻塞狀態再次被放入線程池中。這樣就避免了頻繁創建和銷毀線程。

            Worker Pattern實現了類似線程池的功能。首先定義Task接口:

          package com.crackj2ee.thread;
          public interface Task {
          void execute();
          }

            線程將負責執行execute()方法。注意到任務是由子類通過實現execute()方法實現的,線程本身并不知道自己執行的任務。它只負責運行一個耗時的execute()方法。

            具體任務由子類實現,我們定義了一個CalculateTask和一個TimerTask:

          // CalculateTask.java
          package com.crackj2ee.thread;
          public class CalculateTask implements Task {
          private static int count = 0;
          private int num = count;
          public CalculateTask() {
          count++;
          }
          public void execute() {
          System.out.println("[CalculateTask " + num + "] start...");
          try {
          Thread.sleep(3000);
          }
          catch(InterruptedException ie) {}
          System.out.println("[CalculateTask " + num + "] done.");
          }
          }

          // TimerTask.java
          package com.crackj2ee.thread;
          public class TimerTask implements Task {
          private static int count = 0;
          private int num = count;
          public TimerTask() {
          count++;
          }
          public void execute() {
          System.out.println("[TimerTask " + num + "] start...");
          try {
          Thread.sleep(2000);
          }
          catch(InterruptedException ie) {}
          System.out.println("[TimerTask " + num + "] done.");
          }
          }

            以上任務均簡單的sleep若干秒。

            TaskQueue實現了一個隊列,客戶端可以將請求放入隊列,服務器線程可以從隊列中取出任務:

          package com.crackj2ee.thread;
          import java.util.*;
          public class TaskQueue {
          private List queue = new LinkedList();
          public synchronized Task getTask() {
          while(queue.size()==0) {
          try {
          this.wait();
          }
          catch(InterruptedException ie) {
          return null;
          }
          }
          return (Task)queue.remove(0);
          }
          public synchronized void putTask(Task task) {
          queue.add(task);
          this.notifyAll();
          }
          }

            終于到了真正的WorkerThread,這是真正執行任務的服務器線程:

          package com.crackj2ee.thread;
          public class WorkerThread extends Thread {
          private static int count = 0;
          private boolean busy = false;
          private boolean stop = false;
          private TaskQueue queue;
          public WorkerThread(ThreadGroup group, TaskQueue queue) {
          super(group, "worker-" + count);
          count++;
          this.queue = queue;
          }
          public void shutdown() {
          stop = true;
          this.interrupt();
          try {
          this.join();
          }
          catch(InterruptedException ie) {}
          }
          public boolean isIdle() {
          return !busy;
          }
          public void run() {
          System.out.println(getName() + " start.");
          while(!stop) {
          Task task = queue.getTask();
          if(task!=null) {
          busy = true;
          task.execute();
          busy = false;
          }
          }
          System.out.println(getName() + " end.");
          }
          }

            前面已經講過,queue.getTask()是一個阻塞方法,服務器線程可能在此wait()一段時間。此外,WorkerThread還有一個shutdown方法,用于安全結束線程。

            最后是ThreadPool,負責管理所有的服務器線程,還可以動態增加和減少線程數:

          package com.crackj2ee.thread;
          import java.util.*;
          public class ThreadPool extends ThreadGroup {
          private List threads = new LinkedList();
          private TaskQueue queue;
          public ThreadPool(TaskQueue queue) {
          super("Thread-Pool");
          this.queue = queue;
          }
          public synchronized void addWorkerThread() {
          Thread t = new WorkerThread(this, queue);
          threads.add(t);
          t.start();
          }
          public synchronized void removeWorkerThread() {
          if(threads.size()>0) {
          WorkerThread t = (WorkerThread)threads.remove(0);
          t.shutdown();
          }
          }
          public synchronized void currentStatus() {
          System.out.println("-----------------------------------------------");
          System.out.println("Thread count = " + threads.size());
          Iterator it = threads.iterator();
          while(it.hasNext()) {
          WorkerThread t = (WorkerThread)it.next();
          System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
          }
          System.out.println("-----------------------------------------------");
          }
          }

            currentStatus()方法是為了方便調試,打印出所有線程的當前狀態。

            最后,Main負責完成main()方法:

          package com.crackj2ee.thread;
          public class Main {
          public static void main(String[] args) {
          TaskQueue queue = new TaskQueue();
          ThreadPool pool = new ThreadPool(queue);
          for(int i=0; i<10; i++) {
          queue.putTask(new CalculateTask());
          queue.putTask(new TimerTask());
          }
          pool.addWorkerThread();
          pool.addWorkerThread();
          doSleep(8000);
          pool.currentStatus();
          pool.addWorkerThread();
          pool.addWorkerThread();
          pool.addWorkerThread();
          pool.addWorkerThread();
          pool.addWorkerThread();
          doSleep(5000);
          pool.currentStatus();
          }
          private static void doSleep(long ms) {
          try {
          Thread.sleep(ms);
          }
          catch(InterruptedException ie) {}
          }
          }

            main()一開始放入了20個Task,然后動態添加了一些服務線程,并定期打印線程狀態,運行結果如下:

          worker-0 start.
          [CalculateTask 0] start...
          worker-1 start.
          [TimerTask 0] start...
          [TimerTask 0] done.
          [CalculateTask 1] start...
          [CalculateTask 0] done.
          [TimerTask 1] start...
          [CalculateTask 1] done.
          [CalculateTask 2] start...
          [TimerTask 1] done.
          [TimerTask 2] start...
          [TimerTask 2] done.
          [CalculateTask 3] start...
          -----------------------------------------------
          Thread count = 2
          worker-0: busy
          worker-1: busy
          -----------------------------------------------
          [CalculateTask 2] done.
          [TimerTask 3] start...
          worker-2 start.
          [CalculateTask 4] start...
          worker-3 start.
          [TimerTask 4] start...
          worker-4 start.
          [CalculateTask 5] start...
          worker-5 start.
          [TimerTask 5] start...
          worker-6 start.
          [CalculateTask 6] start...
          [CalculateTask 3] done.
          [TimerTask 6] start...
          [TimerTask 3] done.
          [CalculateTask 7] start...
          [TimerTask 4] done.
          [TimerTask 7] start...
          [TimerTask 5] done.
          [CalculateTask 8] start...
          [CalculateTask 4] done.
          [TimerTask 8] start...
          [CalculateTask 5] done.
          [CalculateTask 9] start...
          [CalculateTask 6] done.
          [TimerTask 9] start...
          [TimerTask 6] done.
          [TimerTask 7] done.
          -----------------------------------------------
          Thread count = 7
          worker-0: idle
          worker-1: busy
          worker-2: busy
          worker-3: idle
          worker-4: busy
          worker-5: busy
          worker-6: busy
          -----------------------------------------------
          [CalculateTask 7] done.
          [CalculateTask 8] done.
          [TimerTask 8] done.
          [TimerTask 9] done.
          [CalculateTask 9] done.

            仔細觀察:一開始只有兩個服務器線程,因此線程狀態都是忙,后來線程數增多,7個線程中的兩個狀態變成idle,說明處于wait()狀態。

            思考:本例的線程調度算法其實根本沒有,因為這個應用是圍繞TaskQueue設計的,不是以Thread Pool為中心設計的。因此,Task調度取決于TaskQueue的getTask()方法,你可以改進這個方法,例如使用優先隊列,使優先級高的任務先被執行。

            如果所有的服務器線程都處于busy狀態,則說明任務繁忙,TaskQueue的隊列越來越長,最終會導致服務器內存耗盡。因此,可以限制TaskQueue的等待任務數,超過最大長度就拒絕處理。許多Web服務器在用戶請求繁忙時就會拒絕用戶:HTTP 503 SERVICE UNAVAILABLE
          主站蜘蛛池模板: 腾冲县| 临泽县| 景洪市| 枣庄市| 温州市| 定襄县| 通许县| 高州市| 桐庐县| 高唐县| 祥云县| 八宿县| 宿州市| 琼结县| 青阳县| 勃利县| 义马市| 江津市| 苏州市| 盐边县| 肥乡县| 嘉禾县| 蒙城县| 镇康县| 苏尼特右旗| 崇阳县| 霍州市| 嘉荫县| 日土县| 开平市| 合江县| 乃东县| 略阳县| 洛隆县| 白玉县| 上思县| 陇川县| 东兰县| 黄山市| 崇明县| 彝良县|