[本文是我對(duì)Java Concurrency In Practice 7.1的歸納和總結(jié). ?轉(zhuǎn)載請(qǐng)注明作者和出處, ?如有謬誤, 歡迎在評(píng)論中指正. ]
并不是所有的阻塞都是可中斷的, 比如InputStream.read方法. 在檢測(cè)到輸入數(shù)據(jù)可用, 到達(dá)流末尾或者拋出異常前, 該方法一直阻塞. 而且阻塞的時(shí)候不會(huì)檢查中斷標(biāo)記, 所以中斷線程無(wú)法使read從阻塞狀態(tài)返回. 但是關(guān)閉流可以使得read方法拋出異常, 從而從阻塞狀態(tài)返回.?
public class ReaderThread extends Thread { private static final int BUFSZ = 1024; private final Socket socket; private final InputStream in; public ReaderThread(Socket socket) throws IOException { this.socket = socket; this.in = socket.getInputStream(); } // 覆蓋Thread類(lèi)的interrupt方法, 加入關(guān)閉socket的代碼 // 如果發(fā)生中斷時(shí), 線程阻塞在read方法上, socket的關(guān)閉會(huì)導(dǎo)致read方法拋出SocketException, 然后run方法運(yùn)行完畢 public void interrupt() { try { socket.close(); } catch (IOException ignored) { } finally { super.interrupt(); } } public void run() { try { byte[] buf = new byte[BUFSZ]; while (true) { int count = in.read(buf); if (count < 0) break; else if (count > 0) processBuffer(buf, count); } } catch (IOException e) { /* Allow thread to exit */ } } private void processBuffer(byte[] buf, int count) { //... } }
如果task并非在自己創(chuàng)建的線程里運(yùn)行, 而是提交給線程池運(yùn)行的話, 就無(wú)法使用上例的方式處理不可中斷阻塞了. 之前有過(guò)分析, 對(duì)于提交給線程池執(zhí)行的task, 應(yīng)該通過(guò)Future.cancel方法提前終止task的運(yùn)行, 所以可以考慮重寫(xiě)Future.cancel方法, 在其中加入關(guān)閉socket的操作. Future對(duì)象是由submit方法返回的, 其源代碼如下:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }?
可知submit方法返回的Future對(duì)象是調(diào)用newTaskFor方法獲得的:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }?
newTaskFor方法被聲明為protected, 所以我們可以通過(guò)繼承覆蓋該方法, 返回自定義的Future對(duì)象.
首先將需要覆蓋的2個(gè)方法定義在接口中:
public interface CancellableTask<T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); } ?
然后讓task類(lèi)實(shí)現(xiàn)CancellableTask接口:
public abstract class SocketUsingTask<T> implements CancellableTask<T> { private Socket socket; protected synchronized void setSocket(Socket s) { socket = s; } public synchronized void cancel() { try { if (socket != null) socket.close(); } catch (IOException ignored) { } } public RunnableFuture<T> newTask() { return new FutureTask<T>(this) { // 定義FutureTask的匿名內(nèi)部類(lèi), 并覆蓋cancel方法, 向其中加入關(guān)閉socket的操作 public boolean cancel(boolean mayInterruptIfRunning) { try { SocketUsingTask.this.cancel(); } finally { return super.cancel(mayInterruptIfRunning); } } }; } }
接著繼承ThreadPoolExecutor類(lèi)并覆蓋newTaskFor方法, 讓該方法返回自定義的FutureTask對(duì)象:
public class CancellingExecutor extends ThreadPoolExecutor { // ... protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { // 如果callable是CancellableTask對(duì)象, 那么就返回自定義的FutureTask(通過(guò)調(diào)用其newTaskFor方法實(shí)現(xiàn)) if (callable instanceof CancellableTask) return ((CancellableTask<T>) callable).newTask(); else return super.newTaskFor(callable); } }
測(cè)試代碼:
public static void main(String[] args) { CancellingExecutor executor = new CancellingExecutor(); SocketUsingTask task = new SocketUsingTask(); task.setSocket(new Socket("www.baidu.com", 80)); Future<V> future = executor.submit(task); future.cancel(true); }