我的家園

          我的家園

          [本文是我對(duì)Java Concurrency In Practice 7.1的歸納和總結(jié). ?轉(zhuǎn)載請(qǐng)注明作者和出處, ?如有謬誤, 歡迎在評(píng)論中指正. ]

          并不是所有的阻塞都是可中斷的, 比如InputStream.read方法. 在檢測(cè)到輸入數(shù)據(jù)可用, 到達(dá)流末尾或者拋出異常前, 該方法一直阻塞. 而且阻塞的時(shí)候不會(huì)檢查中斷標(biāo)記, 所以中斷線程無(wú)法使read從阻塞狀態(tài)返回. 但是關(guān)閉流可以使得read方法拋出異常, 從而從阻塞狀態(tài)返回.?

          public class ReaderThread extends Thread {
          	private static final int BUFSZ = 1024;
          	private final Socket socket;
          	private final InputStream in;
          
          	public ReaderThread(Socket socket) throws IOException {
          		this.socket = socket;
          		this.in = socket.getInputStream();
          	}
          
          	// 覆蓋Thread類(lèi)的interrupt方法, 加入關(guān)閉socket的代碼
          	// 如果發(fā)生中斷時(shí), 線程阻塞在read方法上, socket的關(guān)閉會(huì)導(dǎo)致read方法拋出SocketException, 然后run方法運(yùn)行完畢
          	public void interrupt() {
          		try {
          			socket.close();
          		} catch (IOException ignored) {
          		} finally {
          			super.interrupt();
          		}
          	}
          
          	public void run() {
          		try {
          			byte[] buf = new byte[BUFSZ];
          			while (true) {
          				int count = in.read(buf);
          				if (count < 0)
          					break;
          				else if (count > 0)
          					processBuffer(buf, count);
          			}
          		} catch (IOException e) { /*  Allow thread to exit  */
          		}
          	}
          
          	private void processBuffer(byte[] buf, int count) {
          		//...
          	}
          }

          如果task并非在自己創(chuàng)建的線程里運(yùn)行, 而是提交給線程池運(yùn)行的話, 就無(wú)法使用上例的方式處理不可中斷阻塞了. 之前有過(guò)分析, 對(duì)于提交給線程池執(zhí)行的task, 應(yīng)該通過(guò)Future.cancel方法提前終止task的運(yùn)行, 所以可以考慮重寫(xiě)Future.cancel方法, 在其中加入關(guān)閉socket的操作. Future對(duì)象是由submit方法返回的, 其源代碼如下:

          public <T> Future<T> submit(Callable<T> task) {
          	if (task == null) 
          		throw new NullPointerException();
          	RunnableFuture<T> ftask = newTaskFor(task);
          	execute(ftask);
          	return ftask;
          }?

          可知submit方法返回的Future對(duì)象是調(diào)用newTaskFor方法獲得的:

          protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
          	return new FutureTask<T>(callable);
          }?

          newTaskFor方法被聲明為protected, 所以我們可以通過(guò)繼承覆蓋該方法, 返回自定義的Future對(duì)象.

          首先將需要覆蓋的2個(gè)方法定義在接口中:

          public interface CancellableTask<T> extends Callable<T> { 
          	void cancel(); 
          	RunnableFuture<T> newTask(); 
          } ?

          然后讓task類(lèi)實(shí)現(xiàn)CancellableTask接口:

          public abstract class SocketUsingTask<T> implements CancellableTask<T> {
          	private Socket socket;
          
          	protected synchronized void setSocket(Socket s) {
          		socket = s;
          	}
          
          	public synchronized void cancel() {
          		try {
          			if (socket != null)
          				socket.close();
          		} catch (IOException ignored) {
          		}
          	}
          
          	public RunnableFuture<T> newTask() {
          		return new FutureTask<T>(this) {
          			// 定義FutureTask的匿名內(nèi)部類(lèi), 并覆蓋cancel方法, 向其中加入關(guān)閉socket的操作
          			public boolean cancel(boolean mayInterruptIfRunning) {
          				try {
          					SocketUsingTask.this.cancel();
          				} finally {
          					return super.cancel(mayInterruptIfRunning);
          				}
          			}
          		};
          	}
          }

          接著繼承ThreadPoolExecutor類(lèi)并覆蓋newTaskFor方法, 讓該方法返回自定義的FutureTask對(duì)象:

          public class CancellingExecutor extends ThreadPoolExecutor {
          	// ...
          	protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
          		// 如果callable是CancellableTask對(duì)象, 那么就返回自定義的FutureTask(通過(guò)調(diào)用其newTaskFor方法實(shí)現(xiàn))
          		if (callable instanceof CancellableTask)
          			return ((CancellableTask<T>) callable).newTask();
          		else
          			return super.newTaskFor(callable);
          	}
          }

          測(cè)試代碼:

          public static void main(String[] args) {
          	CancellingExecutor executor = new CancellingExecutor();
          	SocketUsingTask task = new SocketUsingTask();
          	task.setSocket(new Socket("www.baidu.com", 80));
          	Future<V> future = executor.submit(task);
          	future.cancel(true);
          }





          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 纳雍县| 涡阳县| 应城市| 高淳县| 湘阴县| 孝昌县| 拜泉县| 兴和县| 阆中市| 舟山市| 福清市| 金阳县| 延吉市| 利津县| 通州区| 白银市| 乾安县| 广灵县| 福贡县| 霍林郭勒市| 蒙山县| 察雅县| 玛曲县| 霸州市| 莒南县| 阿坝| 安庆市| 江都市| 神池县| 新密市| 洱源县| 博爱县| 平阳县| 榆林市| 延津县| 东辽县| 靖边县| 安达市| 新邵县| 元氏县| 资溪县|