paulwong

          FutureTask and ThreadPoolExecutor

          用ThreadPoolExecutor的時候,又想知道被執行的任務的執行情況,這時就可以用FutureTask。

          ThreadPoolTask
          package com.paul.threadPool;

          import java.io.Serializable;
          import java.util.concurrent.Callable;

          public class ThreadPoolTask implements Callable<String>, Serializable {

              
          private static final long serialVersionUID = 0;
              
              
          // 保存任務所需要的數據
              private Object threadPoolTaskData;

              
          private static int consumeTaskSleepTime = 2000;

              
          public ThreadPoolTask(Object tasks) {
                  
          this.threadPoolTaskData = tasks;
              }


              
          public synchronized String call() throws Exception {
                  
          // 處理一個任務,這里的處理方式太簡單了,僅僅是一個打印語句
                  System.out.println("開始執行任務:" + threadPoolTaskData);
                  String result 
          = "";
                  
          // //便于觀察,等待一段時間
                  try {
          //            long r = 5/0;
                      for ( int  i= 0 ; i< 100000000 ; i++){   
                          
                      }
           
                      result 
          = "OK";
                  }
           catch (Exception e) {
                      e.printStackTrace();
                      result 
          = "ERROR";
                  }

                  threadPoolTaskData 
          = null;
                  
          return result;
              }

          }

          模擬客戶端提交的線程
          package com.paul.threadPool;

          import java.util.concurrent.ExecutionException;
          import java.util.concurrent.FutureTask;

          import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

          public class StartTaskThread implements Runnable{

          private ThreadPoolTaskExecutor threadPoolTaskExecutor;
          private int i;

          public StartTaskThread(ThreadPoolTaskExecutor threadPoolTaskExecutor,int i)
          {
          this.threadPoolTaskExecutor = threadPoolTaskExecutor;
          this.i = i;
          }


          @Override
          public synchronized void run() {
          String task
          = "task@ " + i;
          System.out.println(
          "創建任務并提交到線程池中:" + task);
          FutureTask
          <String> futureTask = new FutureTask<String>(
          new ThreadPoolTask(task));
          threadPoolTaskExecutor.execute(futureTask);
          // 在這里可以做別的任何事情
          String result = null;
          try {
          // 取得結果,同時設置超時執行時間為0.1秒。同樣可以用future.get(),不設置執行超時時間取得結果
          result = futureTask.get();

          }
          catch (InterruptedException e) {
          futureTask.cancel(
          true);
          }
          catch (ExecutionException e) {
          futureTask.cancel(
          true);
          }
          catch (Exception e) {
          futureTask.cancel(
          true);
          // 超時后,進行相應處理
          }
          finally {
           System.out.println("task@" + i + ":result=" + result);

          }


          }


          SPRING配置文件
          <?xml version="1.0" encoding="UTF-8" ?>
          <beans xmlns="http://www.springframework.org/schema/beans"
          xmlns:xsi
          ="http://www.w3.org/2001/XMLSchema-instance"
          xmlns:p
          ="http://www.springframework.org/schema/p"
          xmlns:aop
          ="http://www.springframework.org/schema/aop"
          xmlns:tx
          ="http://www.springframework.org/schema/tx"
          xsi:schemaLocation
          ="
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
          http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
          http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd"
          >



          <!-- 配置數據源 -->
          <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
          destroy-method
          ="close" p:driverClassName="com.mysql.jdbc.Driver"
          p:url
          ="jdbc:mysql://localhost:3306/mb_main?useUnicode=true&amp;characterEncoding=UTF-8&amp;useServerPrepStmts=true" p:username="root" p:password="1234" />

          <!-- 配置Jdbc模板 -->
          <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"
          p:dataSource-ref
          ="dataSource" />

          <!-- 事務管理器 -->
          <bean id="transactionManager"
          class
          ="org.springframework.jdbc.datasource.DataSourceTransactionManager"
          p:dataSource-ref
          ="dataSource" />

          <tx:advice id="jdbcTxAdvice" transaction-manager="transactionManager">
          <tx:attributes>
          <tx:method name="*" />
          </tx:attributes>
          </tx:advice>

          <!-- 使用aop/tx命名空間配置事務管理,這里對service包下的服務類方法提供事務 -->
          <aop:config>
          <aop:pointcut id="jdbcServiceMethod" expression="within(com.baobaotao.service..*)" />
          <aop:advisor pointcut-ref="jdbcServiceMethod" advice-ref="jdbcTxAdvice" />
          </aop:config>

          <!-- 配置dao
          <bean id="loginLogDao" class="com.baobaotao.dao.LoginLogDao"
          p:jdbcTemplate-ref="jdbcTemplate" />
          <bean id="userDao" class="com.baobaotao.dao.UserDao"
          p:jdbcTemplate-ref="jdbcTemplate" />


          <bean id="userService" class="com.baobaotao.service.UserService"
          p:userDao-ref="userDao" p:loginLogDao-ref="loginLogDao" />
          -->

          <bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

          <!-- 核心線程數,默認為1 -->
          <property name="corePoolSize" value="10" />

          <!-- 最大線程數,默認為Integer.MAX_VALUE -->
          <property name="maxPoolSize" value="50" />

          <!-- 隊列最大長度,一般需要設置值>=notifyScheduledMainExecutor.maxNum;默認為Integer.MAX_VALUE
          <property name="queueCapacity" value="1000" />
          -->

          <!-- 線程池維護線程所允許的空閑時間,默認為60s -->
          <property name="keepAliveSeconds" value="300" />

          <!-- 線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認為后者 -->
          <property name="rejectedExecutionHandler">
          <!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 -->
          <!-- CallerRunsPolicy:主線程直接執行該任務,執行完之后嘗試添加下一個任務到線程池中,可以有效降低向線程池內添加任務的速度 -->
          <!-- DiscardOldestPolicy:拋棄舊的任務、暫不支持;會導致被丟棄的任務無法再次被執行 -->
          <!-- DiscardPolicy:拋棄當前任務、暫不支持;會導致被丟棄的任務無法再次被執行 -->
          <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
          </property>

          </bean>


          </beans>


          測試類
          package com.paul.threadPool;

          import java.util.concurrent.ArrayBlockingQueue;
          import java.util.concurrent.ThreadPoolExecutor;
          import java.util.concurrent.TimeUnit;

          import org.junit.Test;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
          import org.springframework.test.context.ContextConfiguration;
          import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;

          @ContextConfiguration
          public class TestThreadPool extends AbstractJUnit4SpringContextTests{

          private static int produceTaskSleepTime = 10;

          private static int produceTaskMaxNumber = 1000;

          @Autowired
          private ThreadPoolTaskExecutor threadPoolTaskExecutor;

          public ThreadPoolTaskExecutor getThreadPoolTaskExecutor() {
          return threadPoolTaskExecutor;
          }


          public void setThreadPoolTaskExecutor(
          ThreadPoolTaskExecutor threadPoolTaskExecutor)
          {
          this.threadPoolTaskExecutor = threadPoolTaskExecutor;
          }


          @Test
          public void testThreadPoolExecutor()
          {
          // 構造一個線程池
          final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 600,
          TimeUnit.SECONDS,
          new ArrayBlockingQueue<Runnable>(3),
          new ThreadPoolExecutor.CallerRunsPolicy());

          for (int i = 1; i <= produceTaskMaxNumber; i++) {

          try {
          Thread.sleep(produceTaskSleepTime);
          }
          catch (InterruptedException e1) {
          e1.printStackTrace();
          }

          new Thread(new StartTaskThread(threadPoolTaskExecutor,i)).start();
          }


          }


          }

          posted on 2011-12-07 15:48 paulwong 閱讀(2698) 評論(1)  編輯  收藏 所屬分類: 性能優化

          Feedback

          # re: FutureTask and ThreadPoolExecutor 2014-03-11 23:41 最代碼

          你好,我根據你的博客整理了分享了一份代碼:http://www.zuidaima.com/share/1724478138158080.htm
          并且發現你代碼中有些紕漏都修改過來了。  回復  更多評論   


          主站蜘蛛池模板: 蓬溪县| 甘洛县| 舞阳县| 英德市| 宜宾市| 潞城市| 当阳市| 海原县| 宁海县| 亳州市| 信丰县| 定安县| 桂林市| 德格县| 上林县| 武汉市| 宁乡县| 林口县| 新晃| 安新县| 黄平县| 百色市| 白山市| 肥城市| 兴化市| 藁城市| 琼中| 仙桃市| 河西区| 黎川县| 印江| 织金县| 克拉玛依市| 保定市| 萍乡市| 梁平县| 玉溪市| 三原县| 营口市| 拜泉县| 莲花县|