posts - 22, comments - 32, trackbacks - 0, articles - 73
            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

          如何在 Spring 使用@Async,@EnableAsync注釋進行異步處理:

          異步處理適用那些與業務邏輯(橫切關注點)不直接相關或者不作為其他業務邏輯輸入的部分,也可在分布式系統中解耦。

          *譯注:橫切關注點(cross-cutting concerns)指一些具有橫越多個模塊的行為,使用傳統的軟件開發方法不能夠達到有效模塊化的一類特殊關注點。*

          Spring 中,`@Async`注解可以標記異步操作。然而,使用`@Async`時有一些限制,僅僅把它加在方法上并不能確保方法會在獨立的線程中執行。如果你只是偶爾用到 `@Async`,需要格外當心。

          1. @Async 的工作機制

          首先為方法添加 `Async` 注解。接著,Spring 會基于 `proxyTargetClass` 屬性,為包含 `Async` 定義的對象創建代理(JDK Proxy/CGlib)。
          最后,Spring 會嘗試搜索與當前上下文相關的線程池,把該方法作為獨立的執行路徑提交。確切地說,Spring 會搜索唯一的 `TaskExecutor` bean 或者名為 `taskExecutor` 的 bean。如果找不到,則使用默認的 `SimpleAsyncTaskExecutor`。

          要完成上面的過程,使用中需要注意幾個限制,否則會出現 `Async` 不起作用的情況。

          2. @Async 的限制

          1. 必須在標記 `@ComponentScan` 或 `@configuration` 的類中使用 `@Async`。

          未來實現類獲取異步處理結果

          如果想要獲取異步處理的結果,可以通過未來接口的實現類調用得到()方法獲得。
          未來接口的常見實現類有FutureTask。
          在SpringBoot中,一般用AsyncResult作為異步結果。

          future 缺點:

          使用Future獲得初始化執行結果時,可以使用初始化附加方法get(),或者替換看isDone()是否為true,這兩種方法都不是很好,因為主線程也會被迫等待。

          從Java 8開始約會了CompletableFuture,它針對Future了改進之處,可以針對某些對象,當初始化任務完成或發生異常時,自動調用對象的替代方法。下面會詳細解釋:

          示例:spring boot工程初步處理業務類
          1.AsyncTaskManager
          @Service
          @EnableAsync
          public class AsyncTaskManager {
          /**
          * 這個業務注入的類
          */
          @Autowired
          private MessageDao messageDao;

          /**
          * @Async注解表示異步,后面的參數對應于線程池配置類ExecutorConfig中的方法名asyncServiceExecutor()
          * 如果不寫后面的參數,直接使用@Async注解,則是使用默認的線程池
          * Future<String>為異步返回的結果??梢酝ㄟ^get()方法獲取結果。
          * @param s
          * @throws Exception
          */
          @Async(value = "asyncTaskExecutor")
          public void transTask(String s) throws Exception {
          messageDao.getMessage(s);
          System.out.println(Thread.currentThread().getName()+"--"+s+" ;time="+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
          TimeUnit.SECONDS.sleep(6);
          }

          /**
          * 異步調用,有返回值,必須是Future類型,不然報錯
          * 如果不寫后面的參數,直接用@Async,則是使用默認的線程池。
          * 使用Future獲得異步執行結果時,要么調用阻塞方法get(),要么輪詢看isDone()是否為true,這兩種方法都不是很好,因為主線程也會被迫等待
          * @param s
          * @return
          */
          @Async(value = "asyncTaskExecutor")
          public Future<String> transTaskForFuture(String s) {
          String result=null;
          try {
          result=messageDao.getMessage(s);
          System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
          TimeUnit.SECONDS.sleep(10);
          } catch (InterruptedException e) {
          return AsyncResult.forExecutionException(e);
          }
          return AsyncResult.forValue(result);
          }

          /**
          * 基于回調的listenableFuture比上種子線程直接返回Future優質是,主線程不用等待,任務在完成后會自動執行回調代碼。
          * 因此在調用時要注冊回調代碼,包括成功回調和失敗回調
          * @param s
          * @return
          */
          @Async(value = "asyncTaskExecutor")
          public ListenableFuture<String> transTaskForCallback(String s) {
          String result=null;
          try {
          result=messageDao.getMessage(s);
          System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
          TimeUnit.SECONDS.sleep(6);
          } catch (InterruptedException e) {
          return AsyncResult.forExecutionException(e);
          }
          return AsyncResult.forValue(result);
          }

          /**
          * 從Java 8開始引入了CompletableFuture,它針對Future做了改進,可以傳入回調對象,當異步任務完成或者發生異常時,自動調用回調對象的回調方法
          * 最主要是可以提供復雜的
          * CompletableFuture可以指定異步處理流程:
          * thenAccept()處理正常結果;
          * exceptional()處理異常結果;
          * thenApplyAsync()用于串行化另一個CompletableFuture;
          * anyOf()和allOf()用于并行化多個CompletableFuture。
          * 詳解請看 https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
          * @param s
          * @return
          */
          @Async(value = "asyncTaskExecutor")
          public CompletableFuture<Object> transTaskForCompletableFuture(String s) {
          Object result=null;
          try {
          result=messageDao.getMessage(s);
          System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
          TimeUnit.SECONDS.sleep(6);
          } catch (Exception e) {
          return AsyncResult.forExecutionException(e).completable();
          }
          return AsyncResult.forValue(result).completable();
          }
          @Async(value = "asyncTaskExecutor")
          public CompletableFuture<Object> transTaskForCompletableFuture2(int s) {
          Object result=null;
          try {
          result=messageDao.getUserCode(s);
          System.out.println(Thread.currentThread().getName()+" 子線程開始執行...result=" + result);
          TimeUnit.SECONDS.sleep(2);
          } catch (Exception e) {
          return AsyncResult.forExecutionException(e).completable();
          }
          return AsyncResult.forValue(result).completable();
          }
          }
          Dao層業務類:


          @Repository
          public class MessageDao {

          public String getMessage(String s){
          return s;
          }

          public String callBackMessage(String s){
          return "這是注冊回調返回結果s="+s;
          }

          public String getUserCode(int id){
          return "000"+id;
          }
          public String getUserName(String code){
          return "李四";
          }
          public String getUserDepartment(String code){
          return "技術開發部";
          }
          }
           

          線程池ThreadPoolTask​​Executor

          SpringBoot中的線程池一般用ThreadPoolTask​​Executor類
          。ThreadPoolTask​​Executor繼承關系如下:

          ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor
          

          關系結構圖為:

          2.自定義線程池配置如下:

          @Configuration
          public class AsyncTaskConfig {
          /**
          * IO密集型任務 = 一般為2*CPU核心數(常出現于線程中:數據庫數據交互、文件上傳下載、網絡數據傳輸等等)
          * CPU密集型任務 = 一般為CPU核心數+1(常出現于線程中:復雜算法)
          * 混合型任務 = 視機器配置和復雜度自測而定
          */
          @Bean(name = "asyncTaskExecutor")
          public ThreadPoolTaskExecutor asyncTaskExecutor() {
          ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
          //1: 核心線程數目
          executor.setCorePoolSize(4);
          //2: 指定最大線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
          executor.setMaxPoolSize(10);
          //3: 隊列中最大的數目
          executor.setQueueCapacity(200);
          //4: 線程名稱前綴
          executor.setThreadNamePrefix("LocustTask-");
          //5:當pool已經達到max size的時候,如何處理新任務
          // CallerRunsPolicy: 會在execute 方法的調用線程中運行被拒絕的任務,如果執行程序已關閉,則會丟棄該任務
          // AbortPolicy: 拋出java.util.concurrent.RejectedExecutionException異常
          // DiscardOldestPolicy: 拋棄舊的任務
          // DiscardPolicy: 拋棄當前的任務
          executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
          //6: 線程空閑后的最大存活時間(默認值 60),當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
          executor.setKeepAliveSeconds(60);
          //7:線程空閑時間,當線程空閑時間達到keepAliveSeconds(秒)時,線程會退出,直到線程數量等于corePoolSize,如果allowCoreThreadTimeout=true,則會直到線程數量等于0
          executor.setAllowCoreThreadTimeOut(false);
          executor.initialize();
          return executor;
          }
          }

          @EnableAsync開啟初步

          @EnableAsync表示開啟初始,可以放在@Controller層上方,也可以放在Applicationclass的上方,也可以直接放在業務類上例AsyncTaskManager

          @Controller
          @EnableAsync
          public class XXXController {
              @Autowired
              private AsyncTaskManager asyncTaskManager;
          
              @GetMapping("/user/getList")
              @ResponseBody
              public String getUserData(){
                  return asyncTaskManager.getAsyncResult();
              }
          }
          
          Junint 4單元測試類如下
          AsyncTaskTest:

            1 public class AsyncTaskTest extends BaseTest {
            2 
            3     @Autowired
            4     private AsyncTaskManager asyncTaskManager;
            5 
            6     @Autowired
            7     private MessageDao messageDao;
            8 
            9     /**
           10      * 單無測試方法,沒有辦法測試多線程池郊果,因為單測試方法運行完后,整個JVM進程會水銷毀,所有測試只能啟動tomcat進行測試。
           11      *
           12      * @throws Exception
           13      */
           14     @Test
           15     public void testAsyncTask() throws Exception {
           16         for (int i = 1; i <= 10; i++) {
           17             asyncTaskManager.transTask("2222");
           18         }
           19     }
           20 
           21     /**
           22      * 主線等待子線完成后,獲取返回結果
           23      *
           24      * @throws Exception
           25      */
           26     @Test
           27     public void testAsyncTaskForFuture() throws Exception {
           28         Future<String> future = asyncTaskManager.transTaskForFuture("AAA---BBB");
           29         while (true) {
           30             if (future.isDone() && !future.isCancelled()) {
           31                 System.out.println(Thread.currentThread().getName() + "子線程執行完畢");
           32                 break;
           33             } else {
           34                 Thread.sleep(2000);
           35                 System.out.println("主線程" + Thread.currentThread().getName() + "待子線程執行完畢");
           36             }
           37         }
           38     }
           39 
           40     /**
           41      * 在調用時候,主線不用等待,可以注冊回調類和方法進行
           42      *
           43      * @throws Exception
           44      */
           45     @Test
           46     public void testAsyncTaskForCallback() throws Exception {
           47         // 在主要線程設置 獨有上下文變量
           48         ThreadContext.setUserId(222222222222L);
           49         ListenableFuture<String> future = asyncTaskManager.transTaskForCallback("AAA---BBB");
           50         future.addCallback(
           51             successCallback -> {
           52                 try {
           53                     String s = future.get(2L, TimeUnit.SECONDS);
           54                     String result = messageDao.callBackMessage(s);
           55                     //在線程池中子線程獲取父線程設置變量
           56                     System.out.println("回調結果:" + result + ";parent thread value:" + ThreadContext.getUserId());
           57                 } catch (Exception e) {
           58                     e.printStackTrace();
           59                 }
           60             },
           61             FailureCallback -> {
           62                 System.out.println("子線程執行失敗.");
           63             }
           64         );
           65         Thread.sleep(20000);
           66     }
           67 
           68     /**
           69      * 驗證多線程常用的場景比如有: 4個任務需要4個線程去執行,同時成功后才執行相應操作
           70      * A,B,C,D 4 個任務
           71      * CompletableFuture.allOf()方法
           72      * 由于 allOf 聚合了多個 CompletableFuture 實例,所以它是沒有返回值的。這也是它的一個缺點
           73      * @throws Exception
           74      */
           75     @Test
           76     public void testAsyncTaskForAllOf() throws Exception {
           77         CompletableFuture<Object> completableFuture_1 = asyncTaskManager.task1("task-1");
           78         CompletableFuture<Object> completableFuture_2 = asyncTaskManager.task2("task-2");
           79         CompletableFuture<Object> completableFuture_3 = asyncTaskManager.task3("task-3");
           80         CompletableFuture<Object> completableFuture_4 = asyncTaskManager.task4("task-4");
           81         // 1: 把4個線程返回 completableFuture_3 組合成一個
           82         CompletableFuture alloff=CompletableFuture.allOf(completableFuture_1,completableFuture_2,completableFuture_3,completableFuture_4);
           83         // 2:如果沒有后續的動作,可以直接 join()和get() 執行結果,主線程一直被阻塞,一直等到用戶線程返回,如果不使用join 和get 主線程不會被阻塞
           84         // CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一樣的,都是阻塞獲取值,它們的區別在于 join() 拋出的是 unchecked Exception。
           85         String result= (String)alloff.join();
           86         System.out.println("所有任務同時完成"+result);
           87         Thread.sleep(20000);
           88     }
           89 
           90     /**
           91      * 驗證多線程常用的場景比如有: 4個任務需要4個線程去執行,同時成功后才執行相應操作
           92      * A,B,C,D 4 個任務
           93      * CompletableFuture.anyOf()方法 其中有一個執行成功,就算完成
           94      *
           95      * @throws Exception
           96      */
           97     @Test
           98     public void testAsyncTaskForAnyOf() throws Exception {
           99         CompletableFuture<Object> completableFuture_1 = asyncTaskManager.task1("task-1");
          100         CompletableFuture<Object> completableFuture_2 = asyncTaskManager.task2("task-2");
          101         CompletableFuture<Object> completableFuture_3 = asyncTaskManager.task3("task-3");
          102         CompletableFuture<Object> completableFuture_4 = asyncTaskManager.task4("task-4");
          103         CompletableFuture anyOf=CompletableFuture.anyOf(completableFuture_1,completableFuture_2,completableFuture_3,completableFuture_4);
          104         //這里利用Jdk8函數式接口lambda表達式來實現匿名內部類,?是泛型通配符
          105         Object  s=anyOf.get(1500,TimeUnit.MILLISECONDS);
          106         System.out.println(" anyof 輸出結果 s="+s);
          107         Thread.sleep(20000);
          108     }
          109 
          110     /**
          111      * 驗證多線程常用的場景比如有: 3個任務需要3個線程去執行
          112      * 根據 A 方法 異步返回結果,分別去異步執行 查詢員工名稱和部門,然后返回結果
          113      * @throws Exception
          114      */
          115     @Test
          116     public void testAsyncTaskForCompletableFuture2() throws Exception {
          117         CompletableFuture<Object> completableFuture_A = asyncTaskManager.task1("task-1");
          118         // 1: 如果A成功后返回結果,作為B的入參去執行(thenApply 方法 都是在自己當前線程中執行)
          119         CompletableFuture<Object> fetchNameFuture_B = completableFuture_A.thenApplyAsync((result) ->{
          120             return messageDao.getUserName((String)result);
          121             }
          122         );
          123         //2:B 執行成功后結果作為入參,執行C,然后返回
          124         CompletableFuture<Object> fetchNameFuture_C=fetchNameFuture_B.thenApplyAsync((result)->{
          125             return messageDao.getUserDepartment((String)result);
          126         });
          127         // join()會一直程序會一直block
          128         System.out.println(fetchNameFuture_C.join());
          129         // 手動完成一個complete,會立即執行,可以看到future調用complete(T t)會立即執行。但是complete(T t)只能調用一次,后續的重復調用會失效
          130         //future已經執行完畢能夠返回結果,此時再調用complete(T t)則會無效
          131         System.out.println(fetchNameFuture_B.complete("complete"));
          132         Thread.sleep(90000);
          133     }
          134 
          135     /**
          136      * 這個方法驗證把兩個異步線程的結果聚合起來返回
          137      * @throws Exception
          138      */
          139     @Test
          140     public void testAsyncTaskForThenCombine() throws Exception {
          141         //1: 第一個查詢查詢員工消息,
          142         CompletableFuture<Object> futureA = asyncTaskManager.task1("task-1");
          143         CompletableFuture<Object> futureB = asyncTaskManager.task2("task-2");
          144 
          145         CompletableFuture<Object> future=futureA.thenCombine(futureB,(resultA,resultB)->{
          146             return resultA+";"+resultB;
          147         });
          148         Object s=future.join();
          149         System.out.println(" result:"+ s);
          150         // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:
          151         Thread.sleep(20000);
          152     }
          153     /**
          154      * 這個方法驗證thenAcceptBoth接口是指,接受兩個異步線程,等待兩個完成后,做下一步動作,它的第二個參數是一個消費型的函數接口
          155      *  BiConsumer 這就標明它可以對上邊傳入的異步線程的結果做處理(改變傳入線程結果的值),并且沒有返回值
          156      * @throws Exception
          157      */
          158     @Test
          159     public void testAsyncTaskForThenAcceptBoth() throws Exception {
          160         //1: 第一個查詢查詢員工消息
          161         CompletableFuture<Object> futureA = asyncTaskManager.task1("task-1");
          162         CompletableFuture<Object> futureB = asyncTaskManager.task2("task-2");
          163 
          164         CompletableFuture<Void> allResult=futureA.thenAcceptBoth(futureB,(resultA,resultB)->{
          165             String result=messageDao.getUserDepartment(resultA+";"+resultB);
          166             System.out.println("======result="+result);
          167         });
          168         // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:
          169         Thread.sleep(20000);
          170     }
          171 
          172     /**
          173      * 驗證futureA,futureB 兩個異步線程,其中一個返回,就返回。
          174      * @throws Exception
          175      */
          176 
          177     @Test
          178     public void testAsyncTaskForAcceptEither() throws Exception {
          179         //1: 第一個查詢查詢員工消息
          180         CompletableFuture <Object> futureA = asyncTaskManager.task1(“ task-1”);
          181          CompletableFuture <Object> futureB = asyncTaskManager.task2(“ task-2”);
          182          futureA.acceptEither(futureB,(result)-> {
          183              字符串s = messageDao.getUserName(result +“”);
          184              System.out.println(“它的一個串行返回返回的結果:” + s);
          185          });
          186          //  主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:187
                   線程。睡眠(20000);
          188      }
          189  }
          190 

          主站蜘蛛池模板: 大化| 尼木县| 东方市| 德格县| 即墨市| 宜兴市| 繁昌县| 新绛县| 伽师县| 永丰县| 牡丹江市| 尖扎县| 琼结县| 米易县| 浦东新区| 道真| 温泉县| 舞钢市| 陵川县| 富平县| 渑池县| 习水县| 额济纳旗| 和田市| 通河县| 固始县| 石嘴山市| 九江市| 武夷山市| 博爱县| 江孜县| 泾川县| 吴堡县| 财经| 怀柔区| 芮城县| 澄江县| 新兴县| 珲春市| 塘沽区| 永和县|