莊周夢(mèng)蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          線程任務(wù)的取消

          Posted on 2007-09-03 15:50 dennis 閱讀(1810) 評(píng)論(0)  編輯  收藏 所屬分類: java
              當(dāng)外部代碼能夠在活動(dòng)自然完成之前,把它的狀態(tài)更改為完成狀態(tài),那么這個(gè)活動(dòng)被稱為可取消(cancellable)。取消任務(wù)是一個(gè)很常見的需求,無論是由于用戶請(qǐng)求還是系統(tǒng)錯(cuò)誤引起的服務(wù)關(guān)閉等等原因。最簡(jiǎn)單的任務(wù)取消策略就是在線程中維持一個(gè)bool變量,在run方法中判斷此變量的bool值來決定是否取消任務(wù)。顯然,這個(gè)bool變量需要聲明為volatile,以保持多線程環(huán)境下可見性(所謂可見性,就是當(dāng)一個(gè)線程修改共享對(duì)象的某個(gè)狀態(tài)變量后,另一個(gè)線程可以馬上看到修改結(jié)果)。下面是一個(gè)來自《java并發(fā)編程實(shí)踐》的例子:
          package net.rubyeye.concurrency.chapter7;

          import java.math.BigInteger;
          import java.util.ArrayList;
          import java.util.List;
          import java.util.concurrent.TimeUnit;

          public class PrimeGenerator implements Runnable {
              
          private final List<BigInteger> primes = new ArrayList<BigInteger>();

              
          private volatile boolean cancelled;
             
          public void run() {
                  BigInteger p 
          = BigInteger.ONE;
                  
          while (!cancelled) {
                      p 
          = p.nextProbablePrime();
                      
          synchronized (this) {
                          primes.add(p);
                      }
                  }
              }
             
          public void cancel() {
                  cancelled 
          = true;
              }
             
          public synchronized List<BigInteger> get() {
                  
          return new ArrayList<BigInteger>(primes);
              }

             
          public static void main(String args[]) throws InterruptedException {
                  PrimeGenerator generator 
          = new PrimeGenerator();
                  
          new Thread(generator).start();
                  
          try {
                      TimeUnit.SECONDS.sleep(
          1);
                  } 
          finally {
                      generator.cancel();
                  }
              }
          }
              main中啟動(dòng)一個(gè)素?cái)?shù)生成的任務(wù),線程運(yùn)行一秒就取消掉。通過線程中的cancelled變量來表征任務(wù)是否繼續(xù)執(zhí)行。既然是最簡(jiǎn)單的策略,那么什么是例外情況?顯然,阻塞操作下(比如調(diào)用join,wait,sleep方法),這樣的策略會(huì)出問題。任務(wù)因?yàn)檎{(diào)用這些阻塞方法而被阻塞,它將不會(huì)去檢查volatile變量,導(dǎo)致取消操作失效。那么解決辦法是什么?中斷!考慮我們用BlockingQueue去保存生成的素?cái)?shù),BlockingQueue的put方法是阻塞的(當(dāng)BlockingQueue滿的時(shí)候,put操作會(huì)阻塞直到有元素被take),讓我們看看不采用中斷,仍然采用簡(jiǎn)單策略會(huì)出現(xiàn)什么情況:
          package net.rubyeye.concurrency.chapter7;

          import java.math.BigInteger;
          import java.util.concurrent.BlockingQueue;
          import java.util.concurrent.CountDownLatch;
          import java.util.concurrent.LinkedBlockingQueue;
          import java.util.concurrent.TimeUnit;

          public class BrokenPrimeProducer extends Thread {
              
          static int i = 1000;

              
          private final BlockingQueue<BigInteger> queue;

              
          private volatile boolean cancelled = false;

              BrokenPrimeProducer(BlockingQueue
          <BigInteger> queue) {
                  
          this.queue = queue;
              }

              
          public void run() {
                  BigInteger p 
          = BigInteger.ONE;
                  
          try {
                      
          while (!cancelled) {
                          p 
          = p.nextProbablePrime();
                          queue.put(p);
                      }
                  } 
          catch (InterruptedException cusumed) {
                  }
              }

              
          public void cancel() {
                  
          this.cancelled = false;
              }

              
          public static void main(String args[]) throws InterruptedException {
                  BlockingQueue
          <BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
                          
          10);
                  BrokenPrimeProducer producer 
          = new BrokenPrimeProducer(queue);
                  producer.start();
                  
          try {
                      
          while (needMorePrimes())
                          queue.take();
                  } 
          finally {
                      producer.cancel();
                  }
              }

              
          public static boolean needMorePrimes() throws InterruptedException {
                  
          boolean result = true;
                  i
          --;
                  
          if (i == 0)
                      result 
          = false;
                  
          return result;
              }
          }
              我們?cè)趍ain中通過queue.take來消費(fèi)產(chǎn)生的素?cái)?shù)(雖然僅僅是取出扔掉),我們只消費(fèi)了1000個(gè)素?cái)?shù),然后嘗試取消產(chǎn)生素?cái)?shù)的任務(wù),很遺憾,取消不了,因?yàn)楫a(chǎn)生素?cái)?shù)的線程產(chǎn)生素?cái)?shù)的速度大于我們消費(fèi)的速度,我們?cè)谙M(fèi)1000后就停止消費(fèi)了,那么任務(wù)將被queue的put方法阻塞,永遠(yuǎn)也不會(huì)去判斷cancelled狀態(tài)變量,任務(wù)取消不了。正確的做法應(yīng)當(dāng)是使用中斷(interrupt):
          package net.rubyeye.concurrency.chapter7;

          import java.math.BigInteger;
          import java.util.concurrent.BlockingQueue;
          import java.util.concurrent.CountDownLatch;
          import java.util.concurrent.LinkedBlockingQueue;
          import java.util.concurrent.TimeUnit;

          public class PrimeProducer extends Thread {
              
          static int i = 1000;

              
          private final BlockingQueue<BigInteger> queue;

              
          private volatile boolean cancelled = false;

              PrimeProducer(BlockingQueue
          <BigInteger> queue) {
                  
          this.queue = queue;
              }

              
          public void run() {
                  BigInteger p 
          = BigInteger.ONE;
                  
          try {
                      
          while (!Thread.currentThread().isInterrupted()) {
                          p 
          = p.nextProbablePrime();
                          queue.put(p);
                      }
                  } 
          catch (InterruptedException cusumed) {
                  }
              }

              
          public void cancel() {
                  interrupt();
              }

              
          public static void main(String args[]) throws InterruptedException {
                  BlockingQueue
          <BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
                          
          10);
                  PrimeProducer producer 
          = new PrimeProducer(queue);
                  producer.start();
                  
          try {
                      
          while (needMorePrimes())
                          queue.take();
                  } 
          finally {
                      producer.cancel();
                  }
              }

              
          public static boolean needMorePrimes() throws InterruptedException {
                  
          boolean result = true;
                  i
          --;
                  
          if (i == 0)
                      result 
          = false;
                  
          return result;
              }
          }
             在run方法中,通過Thread的isInterrupted來判斷interrupt status是否已經(jīng)被修改,從而正確實(shí)現(xiàn)了任務(wù)的取消。關(guān)于interrupt,有一點(diǎn)需要特別說明,調(diào)用interrupt并不意味著必然停止目標(biāo)線程的正在進(jìn)行的工作,它僅僅是傳遞一個(gè)請(qǐng)求中斷的信號(hào)給目標(biāo)線程,目標(biāo)線程會(huì)在下一個(gè)方便的時(shí)刻中斷。而對(duì)于阻塞方法產(chǎn)生的InterruptedException的處理,兩種選擇:要么重新拋出讓上層代碼來處理,要么在catch塊中調(diào)用Thread的interrupt來保存中斷狀態(tài)。除非你確定要讓工作線程終止(如上所示代碼),否則不要僅僅是catch而不做任務(wù)處理工作(生吞了InterruptedException),更詳細(xì)可以參考這里。如果不清楚外部線程的中斷策略,生搬硬套地調(diào)用interrupt可能產(chǎn)生不可預(yù)料的后果,可參見書中7.1.4例子。

             另外一個(gè)取消任務(wù)的方法就是采用Future來管理任務(wù),這是JDK5引入的,用于管理任務(wù)的生命周期,處理異常等。比如調(diào)用ExecutorService的sumit方法會(huì)返回一個(gè)Future來描述任務(wù),而Future有一個(gè)cancel方法用于取消任務(wù)。
             那么,如果任務(wù)調(diào)用了不可中斷的阻塞方法,比如Socket的read、write方法,java.nio中的同步I/O,那么該怎么處理呢?簡(jiǎn)單地,關(guān)閉它們!參考下面的例子:
          package net.rubyeye.concurrency.chapter7;

          import java.io.IOException;
          import java.io.InputStream;
          import java.net.Socket;

          /**
           * 展示對(duì)于不可中斷阻塞的取消任務(wù) 通過關(guān)閉socket引發(fā)異常來中斷
           * 
           * 
          @author Admin
           * 
           
          */
          public abstract class ReaderThread extends Thread {
              
          private final Socket socket;

              
          private final InputStream in;

              
          public ReaderThread(Socket socket) throws IOException {
                  
          this.socket = socket;
                  
          this.in = socket.getInputStream();
              }

              
          // 重寫interrupt方法
              public void interrupt() {
                  
          try {
                      socket.close();
                  } 
          catch (IOException e) {
                  } 
          finally {
                      
          super.interrupt();
                  }
              }

              
          public void run() {
                  
          try {
                      
          byte[] buf = new byte[1024];
                      
          while (true) {
                          
          int count = in.read(buf);
                          
          if (count < 0)
                              
          break;
                          
          else if (count > 0)
                              processBuff(buf, count);
                      }
                  } 
          catch (IOException e) {
                  }
              }

              
          public abstract void processBuff(byte[] buf, int count);
          }
              Reader線程重寫了interrupt方法,其中調(diào)用了socket的close方法用于中斷read方法,最后,又調(diào)用了super.interrupt(),防止當(dāng)調(diào)用可中斷的阻塞方法時(shí)不能正常中斷。


          主站蜘蛛池模板: 宜君县| 安西县| 阳西县| 苍梧县| 甘洛县| 泉州市| 彭阳县| 诸城市| 观塘区| 枣庄市| 深泽县| 江西省| 宜春市| 博罗县| 桃江县| 沈丘县| 丽水市| 佳木斯市| 隆回县| 大关县| 新蔡县| 桐庐县| 西乡县| 廉江市| 上虞市| 临江市| 深水埗区| 黔东| 定边县| 汤原县| 申扎县| 庆元县| 陆丰市| 西藏| 北安市| 姜堰市| 南雄市| 察哈| 河西区| 垣曲县| 双江|