Chan Chen Coding...

          notify() and wait()

          通常,多線程之間需要協(xié)調(diào)工作。例如,瀏覽器的一個(gè)顯示圖片的線程displayThread想要執(zhí)行顯示圖片的任務(wù),必須等待下載線程downloadThread將該圖片下載完畢。如果圖片還沒有下載完,displayThread可以暫停,當(dāng)downloadThread完成了任務(wù)后,再通知displayThread“圖片準(zhǔn)備完畢,可以顯示了”,這時(shí),displayThread繼續(xù)執(zhí)行。

          以上邏輯簡單的說就是:如果條件不滿足,則等待。當(dāng)條件滿足時(shí),等待該條件的線程將被喚醒。在Java中,這個(gè)機(jī)制的實(shí)現(xiàn)依賴于wait/notify。等待機(jī)制與鎖機(jī)制是密切關(guān)聯(lián)的。例如:

          1
          2
          3
          4
          5
          6
          synchronized(obj) {
              while(!condition) {
                  obj.wait();
              }
              obj.doSomething();
          }

          當(dāng)線程A獲得了obj鎖后,發(fā)現(xiàn)條件condition不滿足,無法繼續(xù)下一處理,于是線程A就wait()。

          在另一線程B中,如果B更改了某些條件,使得線程A的condition條件滿足了,就可以喚醒線程A:

          1
          2
          3
          4
                synchronized(obj) {
                     condition = true;
                     obj.notify();
          }

          需要注意的概念是:

          # 調(diào)用obj的wait(), notify()方法前,必須獲得obj鎖,也就是必須寫在synchronized(obj) {...} 代碼段內(nèi)。

          # 調(diào)用obj.wait()后,線程A就釋放了obj的鎖,否則線程B無法獲得obj鎖,也就無法在synchronized(obj) {...} 代碼段內(nèi)喚醒A。

          # 當(dāng)obj.wait()方法返回后,線程A需要再次獲得obj鎖,才能繼續(xù)執(zhí)行。

          # 如果A1,A2,A3都在obj.wait(),則B調(diào)用obj.notify()只能喚醒A1,A2,A3中的一個(gè)(具體哪一個(gè)由JVM決定)。

          # obj.notifyAll()則能全部喚醒A1,A2,A3,但是要繼續(xù)執(zhí)行obj.wait()的下一條語句,必須獲得obj鎖,因此,A1,A2,A3只有一個(gè)有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行,例如A1,其余的需要等待A1釋放obj鎖之后才能繼續(xù)執(zhí)行。

          # 當(dāng)B調(diào)用obj.notify/notifyAll的時(shí)候,B正持有obj鎖,因此,A1,A2,A3雖被喚醒,但是仍無法獲得obj鎖。直到B退出synchronized塊,釋放obj鎖后,A1,A2,A3中的一個(gè)才有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行。

          wait()/sleep()的區(qū)別

          前面講了wait/notify機(jī)制,Thread還有一個(gè)sleep()靜態(tài)方法,它也能使線程暫停一段時(shí)間。sleep與wait的不同點(diǎn)是:sleep并不釋放鎖,并且sleep的暫停和wait暫停是不一樣的。obj.wait會(huì)使線程進(jìn)入obj對象的等待集合中并等待喚醒。

          但是wait()和sleep()都可以通過interrupt()方法打斷線程的暫停狀態(tài),從而使線程立刻拋出InterruptedException。

          如果線程A希望立即結(jié)束線程B,則可以對線程B對應(yīng)的Thread實(shí)例調(diào)用interrupt方法。如果此刻線程B正在wait/sleep/join,則線程B會(huì)立刻拋出InterruptedException,在catch() {} 中直接return即可安全地結(jié)束線程。

          需要注意的是,InterruptedException是線程自己從內(nèi)部拋出的,并不是interrupt()方法拋出的。對某一線程調(diào)用interrupt()時(shí),如果該線程正在執(zhí)行普通的代碼,那么該線程根本就不會(huì)拋出InterruptedException。但是,一旦該線程進(jìn)入到wait()/sleep()/join()后,就會(huì)立刻拋出InterruptedException。

          GuardedSuspention

          GuardedSuspention模式主要思想是:

          當(dāng)條件不滿足時(shí),線程等待,直到條件滿足時(shí),等待該條件的線程被喚醒。

          我們設(shè)計(jì)一個(gè)客戶端線程和一個(gè)服務(wù)器線程,客戶端線程不斷發(fā)送請求給服務(wù)器線程,服務(wù)器線程不斷處理請求。當(dāng)請求隊(duì)列為空時(shí),服務(wù)器線程就必須等待,直到客戶端發(fā)送了請求。

          先定義一個(gè)請求隊(duì)列:Queue

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          package me.luger.Thread;

          import java.util.*;

          public class Queue {
              private List queue = new LinkedList();

              public synchronized Request getRequest() {
                  while (queue.size() == 0) {
                      try {
                          this.wait();
                      } catch (InterruptedException ie) {
                          return null;
                      }
                  }
                  return (Request) queue.remove(0);
              }

              public synchronized void putRequest(Request request) {
                  queue.add(request);
                  this.notifyAll();
              }

          }

          藍(lán)色部分就是服務(wù)器線程的等待條件,而客戶端線程在放入了一個(gè)request后,就使服務(wù)器線程等待條件滿足,于是喚醒服務(wù)器線程。

          客戶端線程:ClientThread

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          package me.luger.Thread;

          public class ClientThread extends Thread {
              private Queue queue;
              private String clientName;

              public ClientThread(Queue queue, String clientName) {
                  this.queue = queue;
                  this.clientName = clientName;
              }

              public String toString() {
                  return "[ClientThread-" + clientName + "]";
              }

              public void run() {
              for(int i=0; i<100; i++ ) {
                  Request request = new Request("" (long)(Math.random()*10000));
                  System.out.println(this " send request: " request);
                  queue.putRequest(request);
                  try {
                      Thread.sleep((long)(Math.random() * 10000 1000));
                  }
                      catch(InterruptedException ie) {
                  }
              }
              System.out.println(this " shutdown.");
              }
          }

          服務(wù)器線程:ServerThread

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          package com.crackj2ee.thread;

          public class ServerThread extends Thread {
              private boolean stop = false;
              private Queue queue;

              public ServerThread(Queue queue) {
                  this.queue = queue;
              }

              public void shutdown() {
                  stop = true;
                  this.interrupt();
                  try {
                      this.join();
                  } catch (InterruptedException ie) {
                  }
              }

              public void run() {
                      while(!stop) {
                          Request request = queue.getRequest();
                          System.out.println("[ServerThread] handle request: " request);
                          try {
                              Thread.sleep(2000);
                          }
                          catch(InterruptedException ie) {}
                      }
                      System.out.println("[ServerThread] shutdown.");
                  }
          }

          服務(wù)器線程在紅色部分可能會(huì)阻塞,也就是說,Queue.getRequest是一個(gè)阻塞方法。這和java標(biāo)準(zhǔn)庫的許多IO方法類似。

          最后,寫一個(gè)Main來啟動(dòng)他們:

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          package me.luger.thread;

          public class Main {

              public static void main(String[] args) {
                  Queue queue = new Queue();
                  ServerThread server = new ServerThread(queue);
                  server.start();
                  ClientThread[] clients = new ClientThread[5];
                  for(int i=0; i<CLIENTS.LENGTH; {
                      clients[i] = new ClientThread(queue, "" i);
                      clients[i].start();
                  }
                  try {
                      Thread.sleep(100000);
                  }
                  catch(InterruptedException ie) {}
                  server.shutdown();
              }
          }
          我們啟動(dòng)了5個(gè)客戶端線程和一個(gè)服務(wù)器線程,運(yùn)行結(jié)果如下:

          [ClientThread-0] send request: Request-4984
          [ServerThread] handle request: Request-4984
          [ClientThread-1] send request: Request-2020
          [ClientThread-2] send request: Request-8980
          [ClientThread-3] send request: Request-5044
          [ClientThread-4] send request: Request-548
          [ClientThread-4] send request: Request-6832
          [ServerThread] handle request: Request-2020
          [ServerThread] handle request: Request-8980
          [ServerThread] handle request: Request-5044
          [ServerThread] handle request: Request-548
          [ClientThread-4] send request: Request-1681
          [ClientThread-0] send request: Request-7859
          [ClientThread-3] send request: Request-3926
          [ServerThread] handle request: Request-6832
          [ClientThread-2] send request: Request-9906
          ......

          可以觀察到ServerThread處理來自不同客戶端的請求。

          思考

          Q: 服務(wù)器線程的wait條件while(queue.size()==0)能否換成if(queue.size()==0)?

          A: 在這個(gè)例子中可以,因?yàn)榉?wù)器線程只有一個(gè)。但是,如果服務(wù)器線程有多個(gè)(例如Web應(yīng)用程序有多個(gè)線程處理并發(fā)請求,這非常普遍),就會(huì)造成嚴(yán)重問題。

          Q: 能否用sleep(1000)代替wait()?

          A: 絕對不可以。sleep()不會(huì)釋放鎖,因此sleep期間別的線程根本沒有辦法調(diào)用getRequest()和putRequest(),導(dǎo)致所有相關(guān)線程都被阻塞。

          Q: (Request)queue.remove(0)可以放到synchronized() {}塊外面嗎?

          A: 不可以。因?yàn)閣hile()是測試queue,remove()是使用queue,兩者是一個(gè)原子操作,不能放在synchronized外面。

          總結(jié)

          多線程設(shè)計(jì)看似簡單,實(shí)際上必須非常仔細(xì)地考慮各種鎖定/同步的條件,稍不小心,就可能出錯(cuò)。并且,當(dāng)線程較少時(shí),很可能發(fā)現(xiàn)不了問題,一旦問題出現(xiàn)又難以調(diào)試。

          所幸的是,已有一些被驗(yàn)證過的模式可以供我們使用,我們會(huì)繼續(xù)介紹一些常用的多線程設(shè)計(jì)模式。



          -----------------------------------------------------
          Silence, the way to avoid many problems;
          Smile, the way to solve many problems;

          posted on 2012-11-03 11:18 Chan Chen 閱讀(206) 評(píng)論(0)  編輯  收藏 所屬分類: Scala / Java

          主站蜘蛛池模板: 汽车| 榆林市| 勃利县| 伊宁市| 平利县| 成武县| 邯郸市| 扶沟县| 泾阳县| 宝清县| 民权县| 城口县| 永兴县| 星子县| 望江县| 扎兰屯市| 讷河市| 木兰县| 耿马| 彭山县| 嘉禾县| 铁力市| 托克逊县| 云霄县| 和平区| 中江县| 东乡县| 永宁县| 衡东县| 图片| 泗阳县| 遂宁市| 临夏市| 内乡县| 南江县| 金山区| 常德市| 怀集县| 西峡县| 咸宁市| 南雄市|