如何優(yōu)雅地停止SPRING BATCH中的REMOTE CHUNKING JOB
SPRING BATCH中的REMOTE CHUNKING JOB,由于是基于MASTER/SLAVE的架構(gòu),其中某個STEP是會在遠(yuǎn)程機(jī)器中執(zhí)行,如果要停止這個JOB,需要考慮兩個問題:1、什么時候發(fā)出停止指令
2、如何等待遠(yuǎn)程STEP的完成
一般停止JOB,可用JobOperator.stop(long executionId)來停止,但這個無法確定什么時候發(fā)出停止指令,如果是在CHUNK的處理中途發(fā)出,則會出現(xiàn)回滾的現(xiàn)象。
STEP_EXECUTION_ID | VERSION | STEP_NAME | JOB_EXECUTION_ID | START_TIME | END_TIME | STATUS | COMMIT_COUNT | READ_COUNT | FILTER_COUNT | WRITE_COUNT | READ_SKIP_COUNT | WRITE_SKIP_COUNT | PROCESS_SKIP_COUNT | ROLLBACK_COUNT | EXIT_CODE | EXIT_MESSAGE | LAST_UPDATED |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2304 | 169 | step2HandleXXX | 434 | 2020-06-22 16:27:54 | 2020-06-22 16:32:46 | STOPPED | 167 | 5010 | 0 | 4831 | 0 | 155 | 0 | 161 | STOPPED | org.springframework.batch.core.JobInterruptedException | 2020-06-22 16:32:46 |
另外SPRING BATCH也不會等遠(yuǎn)程STEP執(zhí)行完成,就將JOB的狀態(tài)設(shè)為Complete。
發(fā)出停止的指令應(yīng)通過ChunkListener達(dá)成:
private static final Logger log = LoggerFactory.getLogger(ItemMasterChunkListener.class);
@Override
public void beforeChunk(ChunkContext context) {
log.info("ItemMasterProcessor.beforeChunk");
}
@Override
public void afterChunk(ChunkContext context) {
log.info("ItemMasterProcessor.afterChunk");
if(XXXX.isStoppingOrPausing()) {
log.info("context.getStepContext().getStepExecution().setTerminateOnly()");
context.getStepContext().getStepExecution().setTerminateOnly();
}
}
@Override
public void afterChunkError(ChunkContext context) {
log.info("ItemMasterProcessor.afterChunkError");
}
}
配置BEAN:
@StepScope
public ItemMasterChunkListener novaXItemMasterChunkListener() {
return new ItemMasterChunkListener();
}
this.masterStepBuilderFactory
.<X, X>get("step2Handle")
.listener(itemMasterChunkListener())
.build();
由于是在CHUNK完成的時候發(fā)出停止指令,就不會出現(xiàn)ROLLBACK的情況。
等待遠(yuǎn)程STEP完成,通過讀取MQ上的MESSAGE是否被消費(fèi)完成,PENDDING的MESSAGE為0的條件即可。
/* (non-Javadoc)
* @see org.springframework.batch.core.domain.JobListener#afterJob()
*/
@Override
public void afterJob(JobExecution jobExecution) {
Integer totalPendingMessages = 0;
String queueName = "";
String messageSelector = "JOB_EXECUTION_ID=" + jobExecution.getJobInstance().getInstanceId();
do{
totalPendingMessages =
this.jmsTemplate.browseSelected(queueName, messageSelector,
(session, browser) ->
Collections.list(browser.getEnumeration()).size()
);
String brokerURL = null;
if(jmsTemplate.getConnectionFactory() instanceof JmsPoolConnectionFactory) {
JmsPoolConnectionFactory connectionFactory =
(JmsPoolConnectionFactory)jmsTemplate.getConnectionFactory();
ActiveMQConnectionFactory activeMQConnectionFactory =
(ActiveMQConnectionFactory)connectionFactory.getConnectionFactory();
brokerURL = activeMQConnectionFactory.getBrokerURL();
} else if(jmsTemplate.getConnectionFactory() instanceof CachingConnectionFactory) {
CachingConnectionFactory connectionFactory =
(CachingConnectionFactory)jmsTemplate.getConnectionFactory();
ActiveMQConnectionFactory activeMQConnectionFactory =
(ActiveMQConnectionFactory)connectionFactory.getTargetConnectionFactory();
brokerURL = activeMQConnectionFactory.getBrokerURL();
}
LOGGER.info("queueName = {}, {}, totalPendingMessages = {}, url={}",
queueName, messageSelector, totalPendingMessages, brokerURL);
Assert.notNull(totalPendingMessages, "totalPendingMessages must not be null.");
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
} while(totalPendingMessages.intValue() > 0);
}
/* (non-Javadoc)
* @see org.springframework.batch.core.domain.JobListener#beforeJob(org.springframework.batch.core.domain.JobExecution)
*/
@Override
public void beforeJob(JobExecution jobExecution) {
}
}
這樣整個JOB就能無異常地停止,且會等待遠(yuǎn)程STEP完成。
Reference:
https://docs.spring.io/spring-batch/docs/4.1.3.RELEASE/reference/html/common-patterns.html#stoppingAJobManuallyForBusinessReasons
https://stackoverflow.com/questions/13603949/count-number-of-messages-in-a-jms-queue
https://stackoverflow.com/questions/55499965/spring-batch-stop-job-execution-from-external-class
https://stackoverflow.com/questions/34621885/spring-batch-pollable-channel-with-replies-contains-chunkresponses-even-if-job
posted on 2020-06-23 11:00 paulwong 閱讀(805) 評論(0) 編輯 收藏 所屬分類: SPRING BOOT