yxhxj2006

          常用鏈接

          統(tǒng)計(jì)

          最新評論

          Java 線程池的原理與實(shí)現(xiàn)

          這幾天主要是狂看源程序,在彌補(bǔ)了一些以前知識空白的同時,也學(xué)會了不少新的知識(比如 NIO),或者稱為新技術(shù)吧。
          線程池就是其中之一,一提到線程,我們會想到以前《操作系統(tǒng)》的生產(chǎn)者與消費(fèi)者,信號量,同步控制等等。
          一提到池,我們會想到數(shù)據(jù)庫連接池,但是線程池又如何呢?

          建議:在閱讀本文前,先理一理同步的知識,特別是syncronized同步關(guān)鍵字的用法。
          關(guān)于我對同步的認(rèn)識,要緣于大三年的一本書,書名好像是 Java 實(shí)戰(zhàn),這本書寫得實(shí)在太妙了,真正的從理論到實(shí)踐,從截圖分析到.class字節(jié)碼分析。哇,我想市場上很難買到這么精致的書了。作為一個Java愛好者,我覺得絕對值得一讀。
          我對此書印象最深之一的就是:equal()方法,由淺入深,經(jīng)典!
          還有就是同步了,其中提到了我的幾個編程誤區(qū),以前如何使用同步提高性能等等,通過學(xué)習(xí),使我對同步的認(rèn)識進(jìn)一步加深。
          --------------------------------------------------------------------------------------------------
          簡單介紹

              創(chuàng)建線程有兩種方式:繼承Thread或?qū)崿F(xiàn)Runnable。Thread實(shí)現(xiàn)了Runnable接口,提供了一個空的run()方法,所以不論是繼承Thread還是實(shí)現(xiàn)Runnable,都要有自己的run()方法。
              一個線程創(chuàng)建后就存在,調(diào)用start()方法就開始運(yùn)行(執(zhí)行run()方法),調(diào)用wait進(jìn)入等待或調(diào)用sleep進(jìn)入休眠期,順利運(yùn)行完畢或休眠被中斷或運(yùn)行過程中出現(xiàn)異常而退出。

          wait和sleep比較:
              sleep方法有:sleep(long millis),sleep(long millis, long nanos),調(diào)用sleep方法后,當(dāng)前線程進(jìn)入休眠期,暫停執(zhí)行,但該線程繼續(xù)擁有監(jiān)視資源的所有權(quán)。到達(dá)休眠時間后線程將繼續(xù)執(zhí)行,直到完成。若在休眠期另一線程中斷該線程,則該線程退出。
              wait方法有:wait(),wait(long timeout),wait(long timeout, long nanos),調(diào)用wait方法后,該線程放棄監(jiān)視資源的所有權(quán)進(jìn)入等待狀態(tài);
                wait():等待有其它的線程調(diào)用notify()或notifyAll()進(jìn)入調(diào)度狀態(tài),與其它線程共同爭奪監(jiān)視。wait()相當(dāng)于wait(0),wait(0, 0)。
                wait(long timeout):當(dāng)其它線程調(diào)用notify()或notifyAll(),或時間到達(dá)timeout亳秒,或有其它某線程中斷該線程,則該線程進(jìn)入調(diào)度狀態(tài)。
                wait(long timeout, long nanos):相當(dāng)于wait(1000000*timeout + nanos),只不過時間單位為納秒。
          ===================================================================================================

          線程池:
              多線程技術(shù)主要解決處理器單元內(nèi)多個線程執(zhí)行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。
              
              假設(shè)一個服務(wù)器完成一項(xiàng)任務(wù)所需時間為:T1 創(chuàng)建線程時間,T2 在線程中執(zhí)行任務(wù)的時間,T3 銷毀線程時間。
              
              如果:T1 + T3 遠(yuǎn)大于 T2,則可以采用線程池,以提高服務(wù)器性能。
                          一個線程池包括以下四個基本組成部分:
                          1、線程池管理器(ThreadPool):用于創(chuàng)建并管理線程池,包括 創(chuàng)建線程池,銷毀線程池,添加新任務(wù);
                          2、工作線程(PoolWorker):線程池中線程,在沒有任務(wù)時處于等待狀態(tài),可以循環(huán)的執(zhí)行任務(wù);
                          3、任務(wù)接口(Task):每個任務(wù)必須實(shí)現(xiàn)的接口,以供工作線程調(diào)度任務(wù)的執(zhí)行,它主要規(guī)定了任務(wù)的入口,任務(wù)執(zhí)行完后的收尾工作,任務(wù)的執(zhí)行狀態(tài)等;
                          4、任務(wù)隊(duì)列(taskQueue):用于存放沒有處理的任務(wù)。提供一種緩沖機(jī)制。
                          
              線程池技術(shù)正是關(guān)注如何縮短或調(diào)整T1,T3時間的技術(shù),從而提高服務(wù)器程序性能的。它把T1,T3分別安排在服務(wù)器程序的啟動和結(jié)束的時間段或者一些空閑的時間段,這樣在服務(wù)器程序處理客戶請求時,不會有T1,T3的開銷了。

              線程池不僅調(diào)整T1,T3產(chǎn)生的時間段,而且它還顯著減少了創(chuàng)建線程的數(shù)目,看一個例子:

              假設(shè)一個服務(wù)器一天要處理50000個請求,并且每個請求需要一個單獨(dú)的線程完成。在線程池中,線程數(shù)一般是固定的,所以產(chǎn)生線程總數(shù)不會超過線程池中線程的數(shù)目,而如果服務(wù)器不利用線程池來處理這些請求則線程總數(shù)為50000。一般線程池大小是遠(yuǎn)小于50000。所以利用線程池的服務(wù)器程序不會為了創(chuàng)建50000而在處理請求時浪費(fèi)時間,從而提高效率。
          ------------------------------------------------------------------------------
          好了,廢話就到這里了,下面就是程序了,我也不講解了,注釋已經(jīng)很清晰了:

          /** 線程池類,工作線程作為其內(nèi)部類 **/

          package org.ymcn.util;

          import java.util.Collections;
          import java.util.Date;
          import java.util.LinkedList;
          import java.util.List;

          import org.apache.log4j.Logger;

          /**
          * 線程池
          * 創(chuàng)建線程池,銷毀線程池,添加新任務(wù)

          * @author obullxl
          */
          public final class ThreadPool {
              private static Logger logger = Logger.getLogger(ThreadPool.class);
              private static Logger taskLogger = Logger.getLogger("TaskLogger");

              private static boolean debug = taskLogger.isDebugEnabled();
              // private static boolean debug = taskLogger.isInfoEnabled();
              /* 單例 */
              private static ThreadPool instance = ThreadPool.getInstance();

              public static final int SYSTEM_BUSY_TASK_COUNT = 150;
              /* 默認(rèn)池中線程數(shù) */
              public static int worker_num = 5;
              /* 已經(jīng)處理的任務(wù)數(shù) */
              private static int taskCounter = 0;

              public static boolean systemIsBusy = false;

              private static List<Task> taskQueue = Collections
                      .synchronizedList(new LinkedList<Task>());
              /* 池中的所有線程 */
              public PoolWorker[] workers;

              private ThreadPool() {
                  workers = new PoolWorker[5];
                  for (int i = 0; i < workers.length; i++) {
                      workers[i] = new PoolWorker(i);
                  }
              }

              private ThreadPool(int pool_worker_num) {
                  worker_num = pool_worker_num;
                  workers = new PoolWorker[worker_num];
                  for (int i = 0; i < workers.length; i++) {
                      workers[i] = new PoolWorker(i);
                  }
              }

              public static synchronized ThreadPool getInstance() {
                  if (instance == null)
                      return new ThreadPool();
                  return instance;
              }
              /**
              * 增加新的任務(wù)
              * 每增加一個新任務(wù),都要喚醒任務(wù)隊(duì)列
              * @param newTask
              */
              public void addTask(Task newTask) {
                  synchronized (taskQueue) {
                      newTask.setTaskId(++taskCounter);
                      newTask.setSubmitTime(new Date());
                      taskQueue.add(newTask);
                      /* 喚醒隊(duì)列, 開始執(zhí)行 */
                      taskQueue.notifyAll();
                  }
                  logger.info("Submit Task<" + newTask.getTaskId() + ">: "
                          + newTask.info());
              }
              /**
              * 批量增加新任務(wù)
              * @param taskes
              */
              public void batchAddTask(Task[] taskes) {
                  if (taskes == null || taskes.length == 0) {
                      return;
                  }
                  synchronized (taskQueue) {
                      for (int i = 0; i < taskes.length; i++) {
                          if (taskes[i] == null) {
                              continue;
                          }
                          taskes[i].setTaskId(++taskCounter);
                          taskes[i].setSubmitTime(new Date());
                          taskQueue.add(taskes[i]);
                      }
                      /* 喚醒隊(duì)列, 開始執(zhí)行 */
                      taskQueue.notifyAll();
                  }
                  for (int i = 0; i < taskes.length; i++) {
                      if (taskes[i] == null) {
                          continue;
                      }
                      logger.info("Submit Task<" + taskes[i].getTaskId() + ">: "
                              + taskes[i].info());
                  }
              }
              /**
              * 線程池信息
              * @return
              */
              public String getInfo() {
                  StringBuffer sb = new StringBuffer();
                  sb.append("\nTask Queue Size:" + taskQueue.size());
                  for (int i = 0; i < workers.length; i++) {
                      sb.append("\nWorker " + i + " is "
                              + ((workers[i].isWaiting()) ? "Waiting." : "Running."));
                  }
                  return sb.toString();
              }
              /**
              * 銷毀線程池
              */
              public synchronized void destroy() {
                  for (int i = 0; i < worker_num; i++) {
                      workers[i].stopWorker();
                      workers[i] = null;
                  }
                  taskQueue.clear();
              }

              /**
              * 池中工作線程
              * 
              * @author obullxl
              */
              private class PoolWorker extends Thread {
                  private int index = -1;
                  /* 該工作線程是否有效 */
                  private boolean isRunning = true;
                  /* 該工作線程是否可以執(zhí)行新任務(wù) */
                  private boolean isWaiting = true;

                  public PoolWorker(int index) {
                      this.index = index;
                      start();
                  }

                  public void stopWorker() {
                      this.isRunning = false;
                  }

                  public boolean isWaiting() {
                      return this.isWaiting;
                  }
                  /**
                  * 循環(huán)執(zhí)行任務(wù)
                  * 這也許是線程池的關(guān)鍵所在
                  */
                  public void run() {
                      while (isRunning) {
                          Task r = null;
                          synchronized (taskQueue) {
                              while (taskQueue.isEmpty()) {
                                  try {
                                      /* 任務(wù)隊(duì)列為空,則等待有新任務(wù)加入從而被喚醒 */
                                      taskQueue.wait(20);
                                  } catch (InterruptedException ie) {
                                      logger.error(ie);
                                  }
                              }
                              /* 取出任務(wù)執(zhí)行 */
                              r = (Task) taskQueue.remove(0);
                          }
                          if (r != null) {
                              isWaiting = false;
                              try {
                                  if (debug) {
                                      r.setBeginExceuteTime(new Date());
                                      taskLogger.debug("Worker<" + index
                                              + "> start execute Task<" + r.getTaskId() + ">");
                                      if (r.getBeginExceuteTime().getTime()
                                              - r.getSubmitTime().getTime() > 1000)
                                          taskLogger.debug("longer waiting time. "
                                                  + r.info() + ",<" + index + ">,time:"
                                                  + (r.getFinishTime().getTime() - r
                                                          .getBeginExceuteTime().getTime()));
                                  }
                                  /* 該任務(wù)是否需要立即執(zhí)行 */
                                  if (r.needExecuteImmediate()) {
                                      new Thread(r).start();
                                  } else {
                                      r.run();
                                  }
                                  if (debug) {
                                      r.setFinishTime(new Date());
                                      taskLogger.debug("Worker<" + index
                                              + "> finish task<" + r.getTaskId() + ">");
                                      if (r.getFinishTime().getTime()
                                              - r.getBeginExceuteTime().getTime() > 1000)
                                          taskLogger.debug("longer execution time. "
                                                  + r.info() + ",<" + index + ">,time:"
                                                  + (r.getFinishTime().getTime() - r
                                                          .getBeginExceuteTime().getTime()));
                                  }
                              } catch (Exception e) {
                                  e.printStackTrace();
                                  logger.error(e);
                              }
                              isWaiting = true;
                              r = null;
                          }
                      }
                  }
              }
          }

          /** 任務(wù)接口類 **/

          package org.ymcn.util;

          import java.util.Date;

          /**
          * 所有任務(wù)接口
          * 其他任務(wù)必須繼承訪類

          * @author obullxl
          */
          public abstract class Task implements Runnable {
              // private static Logger logger = Logger.getLogger(Task.class);
              /* 產(chǎn)生時間 */
              private Date generateTime = null;
              /* 提交執(zhí)行時間 */
              private Date submitTime = null;
              /* 開始執(zhí)行時間 */
              private Date beginExceuteTime = null;
              /* 執(zhí)行完成時間 */
              private Date finishTime = null;

              private long taskId;

              public Task() {
                  this.generateTime = new Date();
              }

              /**
              * 任務(wù)執(zhí)行入口
              */
              public void run() {
                  /**
                  * 相關(guān)執(zhí)行代碼
                  * 
                  * beginTransaction();
                  * 
                  * 執(zhí)行過程中可能產(chǎn)生新的任務(wù) subtask = taskCore();
                  * 
                  * commitTransaction();
                  * 
                  * 增加新產(chǎn)生的任務(wù) ThreadPool.getInstance().batchAddTask(taskCore());
                  */
              }

              /**
              * 所有任務(wù)的核心 所以特別的業(yè)務(wù)邏輯執(zhí)行之處
              * 
              * @throws Exception
              */
              public abstract Task[] taskCore() throws Exception;

              /**
              * 是否用到數(shù)據(jù)庫
              * 
              * @return
              */
              protected abstract boolean useDb();

              /**
              * 是否需要立即執(zhí)行
              * 
              * @return
              */
              protected abstract boolean needExecuteImmediate();

              /**
              * 任務(wù)信息
              * 
              * @return String
              */
              public abstract String info();

              public Date getGenerateTime() {
                  return generateTime;
              }

              public Date getBeginExceuteTime() {
                  return beginExceuteTime;
              }

              public void setBeginExceuteTime(Date beginExceuteTime) {
                  this.beginExceuteTime = beginExceuteTime;
              }

              public Date getFinishTime() {
                  return finishTime;
              }

              public void setFinishTime(Date finishTime) {
                  this.finishTime = finishTime;
              }

              public Date getSubmitTime() {
                  return submitTime;
              }

              public void setSubmitTime(Date submitTime) {
                  this.submitTime = submitTime;
              }

              public long getTaskId() {
                  return taskId;
              }

              public void setTaskId(long taskId) {
                  this.taskId = taskId;
              }

          } 

          posted on 2014-02-28 01:32 奮斗成就男人 閱讀(236) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 准格尔旗| 清水县| 云龙县| 黄浦区| 万全县| 荥阳市| 农安县| 德阳市| 长春市| 邛崃市| 乃东县| 通州市| 浦北县| 景宁| 库尔勒市| 宜良县| 车险| 通化市| 恩施市| 宜春市| 枞阳县| 苏尼特右旗| 穆棱市| 方正县| 秀山| 定襄县| 府谷县| 浮梁县| 宣化县| 长寿区| 三门县| 昭觉县| 莱西市| 陕西省| 海兴县| 固始县| 宁明县| 墨江| 上栗县| 平度市| 常德市|