paulwong

          如何優(yōu)雅地停止SPRING BATCH中的REMOTE CHUNKING JOB

          SPRING BATCH中的REMOTE CHUNKING JOB,由于是基于MASTER/SLAVE的架構(gòu),其中某個STEP是會在遠(yuǎn)程機(jī)器中執(zhí)行,如果要停止這個JOB,需要考慮兩個問題:
          1、什么時候發(fā)出停止指令
          2、如何等待遠(yuǎn)程STEP的完成

          一般停止JOB,可用JobOperator.stop(long executionId)來停止,但這個無法確定什么時候發(fā)出停止指令,如果是在CHUNK的處理中途發(fā)出,則會出現(xiàn)回滾的現(xiàn)象。
          BATCH_STEP_EXECUTION thead tr {background-color: ActiveCaption; color: CaptionText;} th, td {vertical-align: top; font-family: "Tahoma", Arial, Helvetica, sans-serif; font-size: 8pt; padding: 4px; } table, td {border: 1px solid silver;} table {border-collapse: collapse;} thead .col0 {width: 173px;} .col0 {text-align: right;} thead .col1 {width: 82px;} .col1 {text-align: right;} thead .col2 {width: 282px;} thead .col3 {width: 164px;} .col3 {text-align: right;} thead .col4 {width: 161px;} thead .col5 {width: 161px;} thead .col6 {width: 109px;} thead .col7 {width: 127px;} .col7 {text-align: right;} thead .col8 {width: 109px;} .col8 {text-align: right;} thead .col9 {width: 118px;} .col9 {text-align: right;} thead .col10 {width: 117px;} .col10 {text-align: right;} thead .col11 {width: 142px;} .col11 {text-align: right;} thead .col12 {width: 150px;} .col12 {text-align: right;} thead .col13 {width: 166px;} .col13 {text-align: right;} thead .col14 {width: 137px;} .col14 {text-align: right;} thead .col15 {width: 109px;} thead .col16 {width: 156px;} thead .col17 {width: 161px;}
          STEP_EXECUTION_ID VERSION STEP_NAME JOB_EXECUTION_ID START_TIME END_TIME STATUS COMMIT_COUNT READ_COUNT FILTER_COUNT WRITE_COUNT READ_SKIP_COUNT WRITE_SKIP_COUNT PROCESS_SKIP_COUNT ROLLBACK_COUNT EXIT_CODE EXIT_MESSAGE LAST_UPDATED
          2304 169 step2HandleXXX 434 2020-06-22 16:27:54 2020-06-22 16:32:46 STOPPED 167 5010 0 4831 0 155 0 161 STOPPED org.springframework.batch.core.JobInterruptedException 2020-06-22 16:32:46


          另外SPRING BATCH也不會等遠(yuǎn)程STEP執(zhí)行完成,就將JOB的狀態(tài)設(shè)為Complete。

          發(fā)出停止的指令應(yīng)通過ChunkListener達(dá)成:

          public class ItemMasterChunkListener extends ChunkListenerSupport{
              
              private static final Logger log = LoggerFactory.getLogger(ItemMasterChunkListener.class);
              
              
              @Override
              public void beforeChunk(ChunkContext context) {
                  log.info("ItemMasterProcessor.beforeChunk");
              }


              @Override
              public void afterChunk(ChunkContext context) {
                  log.info("ItemMasterProcessor.afterChunk");
                  if(XXXX.isStoppingOrPausing()) {
                      log.info("context.getStepContext().getStepExecution().setTerminateOnly()");
                      context.getStepContext().getStepExecution().setTerminateOnly();
                  }
              }


              @Override
              public void afterChunkError(ChunkContext context) {
                  log.info("ItemMasterProcessor.afterChunkError");
              }


          }


          配置BEAN:

          @Bean
          @StepScope
          public ItemMasterChunkListener novaXItemMasterChunkListener() {
               return new ItemMasterChunkListener();
          }
              
          this.masterStepBuilderFactory
                              .<X, X>get("step2Handle")
                              .listener(itemMasterChunkListener())
                              .build();


          由于是在CHUNK完成的時候發(fā)出停止指令,就不會出現(xiàn)ROLLBACK的情況。

          等待遠(yuǎn)程STEP完成,通過讀取MQ上的MESSAGE是否被消費(fèi)完成,PENDDING的MESSAGE為0的條件即可。

          public class JobExecutionListenerSupport implements JobExecutionListener {

              /* (non-Javadoc)
               * @see org.springframework.batch.core.domain.JobListener#afterJob()
               
          */
              @Override
              public void afterJob(JobExecution jobExecution) {
                  Integer totalPendingMessages = 0;
                  String queueName = "";
                  
                  
                  String messageSelector = "JOB_EXECUTION_ID=" + jobExecution.getJobInstance().getInstanceId();
                  do{
                      totalPendingMessages = 
                              this.jmsTemplate.browseSelected(queueName, messageSelector, 
                                          (session, browser) -> 
                                              Collections.list(browser.getEnumeration()).size()
                                      );
                      
                      String brokerURL = null;
                      if(jmsTemplate.getConnectionFactory() instanceof JmsPoolConnectionFactory) {
                          JmsPoolConnectionFactory connectionFactory =
                                  (JmsPoolConnectionFactory)jmsTemplate.getConnectionFactory();
                          ActiveMQConnectionFactory activeMQConnectionFactory =
                                  (ActiveMQConnectionFactory)connectionFactory.getConnectionFactory();
                          brokerURL = activeMQConnectionFactory.getBrokerURL();
                      } else if(jmsTemplate.getConnectionFactory() instanceof CachingConnectionFactory) {
                          CachingConnectionFactory connectionFactory =
                                  (CachingConnectionFactory)jmsTemplate.getConnectionFactory();
                          ActiveMQConnectionFactory activeMQConnectionFactory =
                                  (ActiveMQConnectionFactory)connectionFactory.getTargetConnectionFactory();
                          brokerURL = activeMQConnectionFactory.getBrokerURL();
                      }
                      
                      LOGGER.info("queueName = {}, {}, totalPendingMessages = {}, url={}", 
                              queueName, messageSelector, totalPendingMessages, brokerURL);
                      Assert.notNull(totalPendingMessages, "totalPendingMessages must not be null.");
                      try {
                          Thread.sleep(5_000);
                      } catch (InterruptedException e) {
                          LOGGER.error(e.getMessage(), e);
                      }
                  } while(totalPendingMessages.intValue() > 0);
                  
              }

              /* (non-Javadoc)
               * @see org.springframework.batch.core.domain.JobListener#beforeJob(org.springframework.batch.core.domain.JobExecution)
               
          */
              @Override
              public void beforeJob(JobExecution jobExecution) {
              }

          }


          這樣整個JOB就能無異常地停止,且會等待遠(yuǎn)程STEP完成。

          Reference:
          https://docs.spring.io/spring-batch/docs/4.1.3.RELEASE/reference/html/common-patterns.html#stoppingAJobManuallyForBusinessReasons

          https://stackoverflow.com/questions/13603949/count-number-of-messages-in-a-jms-queue

          https://stackoverflow.com/questions/55499965/spring-batch-stop-job-execution-from-external-class

          https://stackoverflow.com/questions/34621885/spring-batch-pollable-channel-with-replies-contains-chunkresponses-even-if-job


          posted on 2020-06-23 11:00 paulwong 閱讀(805) 評論(0)  編輯  收藏 所屬分類: SPRING BOOT

          主站蜘蛛池模板: 濮阳市| 墨脱县| 高密市| 西昌市| 东平县| 布尔津县| 焦作市| 安新县| 靖边县| 莲花县| 台山市| 蛟河市| 荆门市| 十堰市| 连州市| 武汉市| 金寨县| 新巴尔虎左旗| 北川| 秦安县| 广汉市| 寿光市| 离岛区| 会同县| 清徐县| 五莲县| 石柱| 邵武市| 若尔盖县| 哈巴河县| 龙州县| 龙里县| 霸州市| 山东省| 和静县| 静乐县| 青岛市| 永安市| 湖南省| 汉寿县| 黄梅县|