我的家園

          我的家園

          配置ThreadPoolExecutor

          Posted on 2012-04-15 16:26 zljpp 閱讀(768) 評論(0)  編輯  收藏

          [本文是我對Java Concurrency In Practice C08的歸納和總結(jié). ?轉(zhuǎn)載請注明作者和出處, ?如有謬誤, 歡迎在評論中指正. ]

          Executors的靜態(tài)方法newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool所返回的線程池都是ThreadPoolExecutor對象或者其子類對象. ThreadPoolExecutor提供了多種配置, 可以根據(jù)實際定制合適的線程池.

          ?

          線程的創(chuàng)建和銷毀

          ThreadPoolExecutor構(gòu)造函數(shù)中的corePoolSize, maximumPoolSize, keepAliveTime參數(shù)與線程的創(chuàng)建和銷毀相關(guān).?

          corePoolSize指定ThreadPoolExecutor中持有的核心線程數(shù), 除非task隊列已滿, ThreadPoolExecutor不會創(chuàng)建超過核心線程數(shù)的線程(corePoolSize為0時是一種特殊情況, 此時就算task隊列沒有飽和, 向線程池第一次提交task時仍然會創(chuàng)建新的線程), 核心線程一旦創(chuàng)建就不會銷毀, 除非設(shè)置了allowCoreThreadTimeOut(true), 或者關(guān)閉線程池.

          maximumPoolSize指定線程池中持有的最大線程數(shù). 對于超過核心線程數(shù)的線程, 如果在指定的超時時間內(nèi)沒有使用到, 就會被銷毀.

          keepAliveTime指定超時時間.

          Executors類的靜態(tài)方法創(chuàng)建線程池的源碼:

          public static ExecutorService newCachedThreadPool() {
          	// 核心線程數(shù)為0, 最大線程數(shù)為Integer.MAX_VALUE, 超時時間為60s
          	return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
          }
          
          public static ExecutorService newFixedThreadPool(int nThreads) {
          	// 核心線程數(shù)和最大線程數(shù)都為調(diào)用方指定的值nThreads, 超時時間為0
          	return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
          			new LinkedBlockingQueue<Runnable>());
          }
          
          public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
          	// 核心線程數(shù)由調(diào)用方指定, 最大線程數(shù)為Integer.MAX_VALUE, 超時時間為0
          	return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue());
          }?

          ?

          task隊列

          線程池內(nèi)部持有一個task隊列, 當task的提交速度超過task的執(zhí)行速度時, task將被緩存在task隊列中等待有線程可用時再執(zhí)行. ThreadPoolExecutor在創(chuàng)建時可以為其指定task隊列, 開發(fā)者一般有三種選擇: 有界隊列, 無界隊列以及同步隊列. Executors.newFixedThreadPool和Executors.newScheduledThreadPool返回的ThreadPoolExecutor對象使用的是無界隊列, 而Executors.newCashedThreadPool返回的ThreadPoolExecutor對象使用的是同步隊列.

          為線程數(shù)不多的線程池指定一個容量大的隊列(或者無界隊列), 有助于減少線程間切換, CPU等方面的消耗, 代價是可能會造成吞吐量下降. 如果使用的是有界隊列, 隊列可能會被填滿, 此時將根據(jù)指定的飽和策略進行處理(見之后的講述).

          對于線程數(shù)很大的線程池, 可以使用同步隊列. 同步隊列(SynchronousQueue)其實不能算是一種隊列, 因為同步隊列沒有緩存的作用. 使用同步隊列時, task被提交時, 直接由線程池中的線程接手. 如果此時線程池中沒有可用的線程, 線程池將創(chuàng)建新的線程接手. 如果線程池無法創(chuàng)建新的線程(比如線程數(shù)已到達maximumPoolSize), 則根據(jù)指定的飽和策略進行處理(同樣見之后的講述).

          ?

          飽和策略

          如果線程池使用的是有界隊列, 那么當有界隊列滿時繼續(xù)提交task時飽和策略會被觸發(fā).

          如果線程池使用的是同步隊列, 那么當線程池無法創(chuàng)建新的線程接手task時飽和策略會被觸發(fā).

          如果線程池被關(guān)閉后, 仍然向其提交task時, 飽和策略也會被觸發(fā).

          ThreadPoolExecutor.setRejectedExecutionHandler方法用于設(shè)定飽和策略. 該方法接受一個RejectedExecutionHandler對象作為參數(shù). RejectedExecutionHandler只定義了一個方法:rejectedExecution(Runnable r, ThreadPoolExecutor executor). rejectedExecution方法在飽和策略被觸發(fā)時由系統(tǒng)回調(diào).

          ThreadPoolExecutor類中預(yù)定義了多個RejectedExecutionHandler的實現(xiàn)類: AbortPolicy, CallerRunsPolicy, DiscardPolicy, 和DiscardOldestPolicy.

          AbortPolicy是默認的飽和策略, 其rejectedExecution方法為:

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          	throw new RejectedExecutionException();
          }?

          可見默認情況下, 觸發(fā)飽和策略時將拋出RejectedExecutionException異常.

          CallerRunsPolicy. 飽和時將在提交task的線程中執(zhí)行task, 而不是由線程池中的線程執(zhí)行:

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          	if (!e.isShutdown()) {
          		r.run();
          	}
          }

          使用CallerRunsPolicy的例子:

          class LifecycleWebServer {
          	// MAX_THREAD_COUNT和MAX_QUEUE_COUNT的值根據(jù)系統(tǒng)的實際情況確定
          	private static final int MAX_THREAD_COUNT = 100;
          	private static final int MAX_QUEUE_COUNT = 1000;
          
          	// 使用有界隊列作為task隊列, 當有界隊列滿時, 將觸發(fā)飽和策略
          	private final ThreadPoolExecutor exec = new ThreadPoolExecutor(0, MAX_THREAD_COUNT, 60L, TimeUnit.SECONDS,
          			new ArrayBlockingQueue<Runnable>(MAX_QUEUE_COUNT));
          
          	public void start() throws IOException {
          		// 設(shè)置飽和策略為CallerRunsPolicy
          		exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
          		ServerSocket socket = new ServerSocket(80);
          		while (!exec.isShutdown()) {
          			try {
          				final Socket conn = socket.accept();
          				exec.execute(new Runnable() {
          					public void run() {
          						handleRequest(conn);
          					}
          				});
          			} catch (RejectedExecutionException e) {
          				if (!exec.isShutdown())
          					log("task submission rejected", e);
          			}
          		}
          	}
          
          	public void stop() {
          		exec.shutdown();
          	}
          
          	void handleRequest(Socket connection) {
          		Request req = readRequest(connection);
          		if (isShutdownRequest(req))
          			stop();
          		else
          			dispatchRequest(req);
          	}
          	
          	public static void main(String[] args) {
          		LifecycleWebServer server = new LifecycleWebServer();
          		try {
          			// 在main線程中啟動server
          			server.start();
          		} catch (IOException e) {
          			e.printStackTrace();
          		}
          	}
          }?

          LifecycleWebServer中的線程池使用CallerRunsPolicy作為其飽和策略. 如果線程池飽和時main線程仍然向線程池提交task, 那么task將在main中執(zhí)行. main線程執(zhí)行task是需要一定時間的, 這樣就給了線程池喘息的機會, 而且main線程在執(zhí)行task的時間內(nèi)無法接受socket連接, 因此socket連接請求將緩存在tcp層. 如果server過載持續(xù)的時間較長, 使得tcp層的緩存不夠, 那么tcp緩存將根據(jù)其策略丟棄部分請求. 如此一來, 整個系統(tǒng)的過載壓力逐步向外擴散: 線程池-線程池中的隊列-main線程-tcp層-client. 這樣的系統(tǒng)在發(fā)生過載時是比較優(yōu)雅的: 既不會因為過多的請求而導致系統(tǒng)資源耗盡, 也不會一發(fā)生過載時就拒絕服務(wù), 只有發(fā)生長時間系統(tǒng)過載時才會出現(xiàn)客戶端無法連接的情況.

          DiscardPolicy. 該策略將最新提交的task丟棄:

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          	// 丟棄, 不做任何處理
          }?

          DiscardOldestPolicy. 該策略丟棄隊列中處于對頭的task, 且試著再次提交最新的task:

          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              if (!e.isShutdown()) {
          	e.getQueue().poll();
          	e.execute(r);
              }
          }?

          DiscardOldestPolicy與PriorityBlockingQueue結(jié)合使用時可能會造成不好的結(jié)果, 因為PriorityBlockingQueue中位于對頭的task是優(yōu)先級最高的task, 發(fā)生飽和時反而首先丟棄優(yōu)先級高的task可能不符合需求.

          ThreadPoolExecutor沒有提供飽和時阻塞的策略, 不過開發(fā)者可以結(jié)合Semaphore實現(xiàn):

          public class BoundedExecutor {
          	private final Executor exec;
          	private final Semaphore semaphore;
          
          	public BoundedExecutor(Executor exec, int bound) {
          		this.exec = exec;
          		// 設(shè)定信號量permit的上限
          		this.semaphore = new Semaphore(bound);
          	}
          
          	public void submitTask(final Runnable command) throws InterruptedException {
          		// 提交task前先申請permit, 如果無法申請到permit, 調(diào)用submitTask的線程將被阻塞, 直到有permit可用
          		semaphore.acquire();
          		try {
          			exec.execute(new Runnable() {
          				public void run() {
          					try {
          						command.run();
          					} finally {
          						// 提交成功了, 運行task后釋放permit
          						semaphore.release();
          					}
          				}
          			});
          		} catch (RejectedExecutionException e) {
          			// 如果沒有提交成功, 也需要釋放permit
          			semaphore.release();
          		}
          	}
          }

          ?

          ThreadFactory

          在創(chuàng)建ThreadPoolExecutor時還可以為其指定ThreadFactory, 當線程池需要創(chuàng)建新的線程時會調(diào)用ThreadFactory的newThread方法. 默認的ThreadFactory創(chuàng)建的線程是nonDaemon, 線程優(yōu)先級為NORM_PRIORITY的線程, 并且為其指定了可識別的線程名稱:

          public Thread newThread(Runnable r) {
          	Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
          	if (t.isDaemon())
          		t.setDaemon(false);
          	if (t.getPriority() != Thread.NORM_PRIORITY)
          		t.setPriority(Thread.NORM_PRIORITY);
          	return t;
          }?

          開發(fā)者可以根據(jù)自身需要為ThreadPoolExecutor指定自定義的ThreadFactory. 例如:

          public class MyThreadFactory implements ThreadFactory {
          	private final String poolName;
          
          	public MyThreadFactory(String poolName) {
          		this.poolName = poolName;
          	}
          
          	public Thread newThread(Runnable runnable) {
          		return new MyAppThread(runnable, poolName);
          	}
          }
          
          public class MyAppThread extends Thread {
          	public static final String DEFAULT_NAME = "MyAppThread";
          	private static volatile boolean debugLifecycle = false;
          	private static final AtomicInteger created = new AtomicInteger();
          	private static final AtomicInteger alive = new AtomicInteger();
          	private static final Logger log = Logger.getAnonymousLogger();
          
          	public MyAppThread(Runnable r) {
          		this(r, DEFAULT_NAME);
          	}
          
          	public MyAppThread(Runnable runnable, String name) {
          		// 為自定義的Thread類指定線程名稱
          		super(runnable, name + "-" + created.incrementAndGet());
          		// 設(shè)置UncaughtExceptionHandler. UncaughtExceptionHandler的uncaughtException方法將在線程運行中拋出未捕獲異常時由系統(tǒng)調(diào)用
          		setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
          			public void uncaughtException(Thread t, Throwable e) {
          				log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
          			}
          		});
          	}
          
          	public void run() {
          		// Copy debug flag to ensure consistent value throughout. 
          		boolean debug = debugLifecycle;
          		if (debug)
          			log.log(Level.FINE, "Created " + getName());
          		try {
          			alive.incrementAndGet();
          			super.run();
          		} finally {
          			alive.decrementAndGet();
          			if (debug)
          				log.log(Level.FINE, "Exiting " + getName());
          		}
          	}
          
          	public static int getThreadsCreated() {
          		return created.get();
          	}
          
          	public static int getThreadsAlive() {
          		return alive.get();
          	}
          
          	public static boolean getDebug() {
          		return debugLifecycle;
          	}
          
          	public static void setDebug(boolean b) {
          		debugLifecycle = b;
          	}
          }

          ?

          擴展ThreadPoolExecutor

          ThreadPoolExecutor類提供了多個"鉤子"方法, 以供其子類實現(xiàn), 比如beforeExecute, afterExecute, terminated等. 所謂"鉤子"是指基類預(yù)留的, 但是沒有提供具體實現(xiàn)的方法, 其方法體為空. 子類可以根據(jù)需要為"鉤子"提供具體實現(xiàn).

          beforeExecute和afterExecute方法分別在執(zhí)行task前后調(diào)用:

          private void runTask(Runnable task) {
          	final ReentrantLock runLock = this.runLock;
          	runLock.lock();
          	try {
          		if (runState < STOP && Thread.interrupted() && runState >= STOP)
          			thread.interrupt();
          		boolean ran = false;
          		beforeExecute(thread, task);
          		try {
          			task.run();
          			ran = true;
          			afterExecute(task, null);
          			++completedTasks;
          		} catch (RuntimeException ex) {
          			if (!ran)
          				afterExecute(task, ex);
          			throw ex;
          		}
          	} finally {
          		runLock.unlock();
          	}
          }?

          beforeExecute和afterExecute方法可以用于記錄日志, 統(tǒng)計數(shù)據(jù)等操作.

          terminated方法在線程池被關(guān)閉后調(diào)用. terminated方法可以用于釋放線程池申請的資源.

          ?






          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導航:
           
          主站蜘蛛池模板: 汝南县| 和政县| 佛山市| 屏山县| 贞丰县| 乐昌市| 宜兴市| 宜宾市| 汾阳市| 洪泽县| 曲沃县| 红桥区| 延边| 宁化县| 平远县| 渝中区| 巫溪县| 兰州市| 高台县| 阆中市| 邓州市| 东源县| 招远市| 唐海县| 凤庆县| 余姚市| 武安市| 宜阳县| 马龙县| 江西省| 新竹县| 宿州市| 华容县| 太仆寺旗| 尼玛县| 迭部县| 乌兰浩特市| 通渭县| 安塞县| 嘉义市| 民勤县|