paulwong

          如何優雅地停止SPRING BATCH中的REMOTE CHUNKING JOB

          SPRING BATCH中的REMOTE CHUNKING JOB,由于是基于MASTER/SLAVE的架構,其中某個STEP是會在遠程機器中執行,如果要停止這個JOB,需要考慮兩個問題:
          1、什么時候發出停止指令
          2、如何等待遠程STEP的完成

          一般停止JOB,可用JobOperator.stop(long executionId)來停止,但這個無法確定什么時候發出停止指令,如果是在CHUNK的處理中途發出,則會出現回滾的現象。
          BATCH_STEP_EXECUTION thead tr {background-color: ActiveCaption; color: CaptionText;} th, td {vertical-align: top; font-family: "Tahoma", Arial, Helvetica, sans-serif; font-size: 8pt; padding: 4px; } table, td {border: 1px solid silver;} table {border-collapse: collapse;} thead .col0 {width: 173px;} .col0 {text-align: right;} thead .col1 {width: 82px;} .col1 {text-align: right;} thead .col2 {width: 282px;} thead .col3 {width: 164px;} .col3 {text-align: right;} thead .col4 {width: 161px;} thead .col5 {width: 161px;} thead .col6 {width: 109px;} thead .col7 {width: 127px;} .col7 {text-align: right;} thead .col8 {width: 109px;} .col8 {text-align: right;} thead .col9 {width: 118px;} .col9 {text-align: right;} thead .col10 {width: 117px;} .col10 {text-align: right;} thead .col11 {width: 142px;} .col11 {text-align: right;} thead .col12 {width: 150px;} .col12 {text-align: right;} thead .col13 {width: 166px;} .col13 {text-align: right;} thead .col14 {width: 137px;} .col14 {text-align: right;} thead .col15 {width: 109px;} thead .col16 {width: 156px;} thead .col17 {width: 161px;}
          STEP_EXECUTION_ID VERSION STEP_NAME JOB_EXECUTION_ID START_TIME END_TIME STATUS COMMIT_COUNT READ_COUNT FILTER_COUNT WRITE_COUNT READ_SKIP_COUNT WRITE_SKIP_COUNT PROCESS_SKIP_COUNT ROLLBACK_COUNT EXIT_CODE EXIT_MESSAGE LAST_UPDATED
          2304 169 step2HandleXXX 434 2020-06-22 16:27:54 2020-06-22 16:32:46 STOPPED 167 5010 0 4831 0 155 0 161 STOPPED org.springframework.batch.core.JobInterruptedException 2020-06-22 16:32:46


          另外SPRING BATCH也不會等遠程STEP執行完成,就將JOB的狀態設為Complete。

          發出停止的指令應通過ChunkListener達成:

          public class ItemMasterChunkListener extends ChunkListenerSupport{
              
              private static final Logger log = LoggerFactory.getLogger(ItemMasterChunkListener.class);
              
              
              @Override
              public void beforeChunk(ChunkContext context) {
                  log.info("ItemMasterProcessor.beforeChunk");
              }


              @Override
              public void afterChunk(ChunkContext context) {
                  log.info("ItemMasterProcessor.afterChunk");
                  if(XXXX.isStoppingOrPausing()) {
                      log.info("context.getStepContext().getStepExecution().setTerminateOnly()");
                      context.getStepContext().getStepExecution().setTerminateOnly();
                  }
              }


              @Override
              public void afterChunkError(ChunkContext context) {
                  log.info("ItemMasterProcessor.afterChunkError");
              }


          }


          配置BEAN:

          @Bean
          @StepScope
          public ItemMasterChunkListener novaXItemMasterChunkListener() {
               return new ItemMasterChunkListener();
          }
              
          this.masterStepBuilderFactory
                              .<X, X>get("step2Handle")
                              .listener(itemMasterChunkListener())
                              .build();


          由于是在CHUNK完成的時候發出停止指令,就不會出現ROLLBACK的情況。

          等待遠程STEP完成,通過讀取MQ上的MESSAGE是否被消費完成,PENDDING的MESSAGE為0的條件即可。

          public class JobExecutionListenerSupport implements JobExecutionListener {

              /* (non-Javadoc)
               * @see org.springframework.batch.core.domain.JobListener#afterJob()
               
          */
              @Override
              public void afterJob(JobExecution jobExecution) {
                  Integer totalPendingMessages = 0;
                  String queueName = "";
                  
                  
                  String messageSelector = "JOB_EXECUTION_ID=" + jobExecution.getJobInstance().getInstanceId();
                  do{
                      totalPendingMessages = 
                              this.jmsTemplate.browseSelected(queueName, messageSelector, 
                                          (session, browser) -> 
                                              Collections.list(browser.getEnumeration()).size()
                                      );
                      
                      String brokerURL = null;
                      if(jmsTemplate.getConnectionFactory() instanceof JmsPoolConnectionFactory) {
                          JmsPoolConnectionFactory connectionFactory =
                                  (JmsPoolConnectionFactory)jmsTemplate.getConnectionFactory();
                          ActiveMQConnectionFactory activeMQConnectionFactory =
                                  (ActiveMQConnectionFactory)connectionFactory.getConnectionFactory();
                          brokerURL = activeMQConnectionFactory.getBrokerURL();
                      } else if(jmsTemplate.getConnectionFactory() instanceof CachingConnectionFactory) {
                          CachingConnectionFactory connectionFactory =
                                  (CachingConnectionFactory)jmsTemplate.getConnectionFactory();
                          ActiveMQConnectionFactory activeMQConnectionFactory =
                                  (ActiveMQConnectionFactory)connectionFactory.getTargetConnectionFactory();
                          brokerURL = activeMQConnectionFactory.getBrokerURL();
                      }
                      
                      LOGGER.info("queueName = {}, {}, totalPendingMessages = {}, url={}", 
                              queueName, messageSelector, totalPendingMessages, brokerURL);
                      Assert.notNull(totalPendingMessages, "totalPendingMessages must not be null.");
                      try {
                          Thread.sleep(5_000);
                      } catch (InterruptedException e) {
                          LOGGER.error(e.getMessage(), e);
                      }
                  } while(totalPendingMessages.intValue() > 0);
                  
              }

              /* (non-Javadoc)
               * @see org.springframework.batch.core.domain.JobListener#beforeJob(org.springframework.batch.core.domain.JobExecution)
               
          */
              @Override
              public void beforeJob(JobExecution jobExecution) {
              }

          }


          這樣整個JOB就能無異常地停止,且會等待遠程STEP完成。

          Reference:
          https://docs.spring.io/spring-batch/docs/4.1.3.RELEASE/reference/html/common-patterns.html#stoppingAJobManuallyForBusinessReasons

          https://stackoverflow.com/questions/13603949/count-number-of-messages-in-a-jms-queue

          https://stackoverflow.com/questions/55499965/spring-batch-stop-job-execution-from-external-class

          https://stackoverflow.com/questions/34621885/spring-batch-pollable-channel-with-replies-contains-chunkresponses-even-if-job


          posted on 2020-06-23 11:00 paulwong 閱讀(805) 評論(0)  編輯  收藏 所屬分類: SPRING BOOT

          主站蜘蛛池模板: 屏边| 民县| 疏附县| 盐池县| 崇信县| 皮山县| 重庆市| 富锦市| 桂阳县| 印江| 浦东新区| 舟曲县| 新津县| 合山市| 海兴县| 潞西市| 灵宝市| 蒙城县| 兴城市| 苏尼特右旗| 永城市| 阳朔县| 旺苍县| 潜江市| 上杭县| 辽阳县| 蓬溪县| 临安市| 广南县| 东乌珠穆沁旗| 获嘉县| 泗洪县| 滨州市| 泾川县| 通州市| 资源县| 南汇区| 葵青区| 遵化市| 通化县| 吴川市|