paulwong

          Transform RemoteChunk to remote with json format in Spring Batch

          Spring Batch Remote Chunk模式下,遠(yuǎn)程執(zhí)行JOB時(shí),傳輸?shù)膶ο笫荂hunkRequest/ChunkResponse,無法轉(zhuǎn)成JSON格式傳輸。

          注意此處使用的是SPRING JACKSON,而不是JACKSON。一般是在SPRING INTEGRATIONA框架下轉(zhuǎn)的。

          需要自定義Transformer:

          JsonToChunkRequestTransformer.java
          package com.frandorado.springbatchawsintegrationslave.transformer;

          import java.io.IOException;
          import java.util.Collection;
          import java.util.Map;
          import java.util.stream.IntStream;

          import org.springframework.batch.core.ExitStatus;
          import org.springframework.batch.core.StepContribution;
          import org.springframework.batch.core.StepExecution;
          import org.springframework.batch.integration.chunk.ChunkRequest;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.integration.aws.support.AwsHeaders;
          import org.springframework.integration.json.JsonToObjectTransformer;
          import org.springframework.messaging.Message;
          import org.springframework.stereotype.Component;

          import com.amazonaws.services.sqs.AmazonSQSAsync;
          import com.fasterxml.jackson.databind.ObjectMapper;

          @Component
          public class JsonToChunkRequestTransformer extends JsonToObjectTransformer {
              
              private static final String MESSAGE_GROUP_ID_HEADER = "message-group-id";
              
              @Autowired
              AmazonSQSAsync amazonSQSAsync;
              
              @Override
              protected Object doTransform(Message<?> message) throws Exception {
                  // ACK
                  ack(message);
                  
                  return this.getMessageBuilderFactory().withPayload(buildChunkRequest(message)).setHeader(MESSAGE_GROUP_ID_HEADER, "unique").build();
              }
              
              private ChunkRequest buildChunkRequest(Message<?> message) throws IOException {
                  Map map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
                  Map stepContributionMap = (Map) map.get("stepContribution");
                  Map exitStatusMap = (Map) stepContributionMap.get("exitStatus");
                  
                  StepContribution stepContribution = new StepContribution(new StepExecution("null", null));
                  ExitStatus exitStatus = new ExitStatus((String) exitStatusMap.get("exitCode"), (String) exitStatusMap.get("exitDescription"));
                  
                  IntStream.range(0, (Integer) stepContributionMap.get("readCount")).forEach(e -> stepContribution.incrementReadCount());
                  stepContribution.incrementWriteCount((Integer) stepContributionMap.get("writeCount"));
                  stepContribution.incrementFilterCount((Integer) stepContributionMap.get("filterCount"));
                  stepContribution.incrementReadSkipCount((Integer) stepContributionMap.get("readSkipCount"));
                  IntStream.range(0, (Integer) stepContributionMap.get("writeSkipCount")).forEach(e -> stepContribution.incrementWriteSkipCount());
                  IntStream.range(0, (Integer) stepContributionMap.get("processSkipCount"))
                          .forEach(e -> stepContribution.incrementProcessSkipCount());
                  stepContribution.setExitStatus(exitStatus);
                  
                  return new ChunkRequest((Integer) map.get("sequence"), (Collection) map.get("items"), (Integer) map.get("jobId"), stepContribution);
              }
              
              private void ack(Message<?> message) {
                  String receiptHandle = message.getHeaders().get(AwsHeaders.RECEIPT_HANDLE, String.class);
                  String queue = message.getHeaders().get(AwsHeaders.QUEUE, String.class);
                  String queueUrl = amazonSQSAsync.getQueueUrl(queue).getQueueUrl();
                  
                  amazonSQSAsync.deleteMessage(queueUrl, receiptHandle);
              }
          }


          JsonToChunkResponseTransformer.java
          package com.frandorado.springbatchawsintegrationmaster.transformer;

          import java.io.IOException;
          import java.util.Map;

          import org.springframework.batch.core.StepContribution;
          import org.springframework.batch.core.StepExecution;
          import org.springframework.batch.integration.chunk.ChunkResponse;
          import org.springframework.integration.json.JsonToObjectTransformer;
          import org.springframework.messaging.Message;
          import org.springframework.stereotype.Component;

          import com.fasterxml.jackson.databind.ObjectMapper;

          @Component
          public class JsonToChunkResponseTransformer extends JsonToObjectTransformer {
              
              @Override
              protected Object doTransform(Message<?> message) throws Exception {
                  return buildChunkResponse(message);
              }
              
              private ChunkResponse buildChunkResponse(Message<?> message) throws IOException {
                  Map map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
                  
                  Integer jobId = (Integer) map.get("jobId");
                  Integer sequence = (Integer) map.get("sequence");
                  String messageContent = (String) map.get("message");
                  Boolean status = (Boolean) map.get("successful");
                  
                  StepContribution stepContribution = new StepContribution(new StepExecution("-", null));
                  
                  return new ChunkResponse(status, sequence, Long.valueOf(jobId), stepContribution, messageContent);
              }
          }


          還有一種方式,就是如果第三類不支持轉(zhuǎn)JSON,即代碼里沒有JACKSON的注解,可以采用MIXIN的方式:

          StepExecutionJacksonMixIn.java
          package org.springframework.cloud.dataflow.rest.client.support;

          import com.fasterxml.jackson.annotation.JsonCreator;
          import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
          import com.fasterxml.jackson.annotation.JsonProperty;

          import org.springframework.batch.core.StepExecution;

          /**
           * Jackson MixIn for {
          @link StepExecution} de-serialization.
           *
           * 
          @author Gunnar Hillert
           * 
          @since 1.0
           
          */
          @JsonIgnoreProperties({ "jobExecution", "jobParameters", "jobExecutionId", "skipCount", "summary" })
          public abstract class StepExecutionJacksonMixIn {

              @JsonCreator
              StepExecutionJacksonMixIn(@JsonProperty("stepName") String stepName) {
              }

          }

          在配置文件中注冊才能使用:
          JacksonConfiguration.java
          import java.util.Locale;
          import java.util.TimeZone;

          import org.springframework.batch.core.ExitStatus;
          import org.springframework.batch.core.JobExecution;
          import org.springframework.batch.core.JobInstance;
          import org.springframework.batch.core.JobParameter;
          import org.springframework.batch.core.JobParameters;
          import org.springframework.batch.core.StepExecution;
          import org.springframework.batch.item.ExecutionContext;
          import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.integration.support.json.Jackson2JsonObjectMapper;

          import com.fasterxml.jackson.databind.ObjectMapper;
          import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
          import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
          import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.ISO8601DateFormatWithMilliSeconds;
          import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.ExecutionContextJacksonMixIn;
          import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.ExitStatusJacksonMixIn;
          import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobExecutionJacksonMixIn;
          import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobInstanceJacksonMixIn;
          import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobParameterJacksonMixIn;
          import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobParametersJacksonMixIn;
          import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.StepExecutionJacksonMixIn;

          @Configuration
          public class JacksonConfiguration {

              @Bean
              public Jackson2JsonObjectMapper jackson2JsonObjectMapper(ObjectMapper objectMapper) {
                  return new Jackson2JsonObjectMapper(objectMapper);
              }
              
              @Bean
              public Jackson2ObjectMapperBuilderCustomizer dataflowObjectMapperBuilderCustomizer() {
                  return (builder) -> {
                      builder.dateFormat(new ISO8601DateFormatWithMilliSeconds(TimeZone.getDefault(), Locale.getDefault(), true));
                      // apply SCDF Batch Mixins to
                      
          // ignore the JobExecution in StepExecution to prevent infinite loop.
                      
          // https://github.com/spring-projects/spring-hateoas/issues/333
                      builder.mixIn(StepExecution.class, StepExecutionJacksonMixIn.class);
                      builder.mixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
                      builder.mixIn(JobExecution.class, JobExecutionJacksonMixIn.class);
                      builder.mixIn(JobParameters.class, JobParametersJacksonMixIn.class);
                      builder.mixIn(JobParameter.class, JobParameterJacksonMixIn.class);
                      builder.mixIn(JobInstance.class, JobInstanceJacksonMixIn.class);
          //            builder.mixIn(StepExecutionHistory.class, StepExecutionHistoryJacksonMixIn.class);
                      builder.mixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
                      builder.mixIn(ExitStatus.class, ExitStatusJacksonMixIn.class);
          //            objectMapper.setDateFormat(new ISO8601DateFormatWithMilliSeconds());
                      builder.modules(new JavaTimeModule(), new Jdk8Module());
                  };
              }
          }

              @Bean
              public IntegrationFlow flow4Contribution(
                      ConnectionFactory connectionFactory, 
                      JobProperties jobProperties,
                      Jackson2JsonObjectMapper jackson2JsonObjectMapper
              ) {
                  return IntegrationFlows
                              .from(request4ContributionMaster())
                              .enrichHeaders(headerEnricherConfigurer())
                              .transform(Transformers.toJson(jackson2JsonObjectMapper))
                              .handle(jmsOutboundGateway4Contribution(connectionFactory, jobProperties))
                              .transform(Transformers.fromJson(StepExecution.class, jackson2JsonObjectMapper))
                              .channel(replies4ContributionMaster(null))
                              .get();
              }


          https://github.com/spring-cloud/spring-cloud-dataflow/tree/master/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/support

          https://frandorado.github.io/spring/2019/07/29/spring-batch-aws-series-introduction.html

          https://github.com/frandorado/spring-projects/tree/master/spring-batch-aws-integration/spring-batch-aws-integration-master/src/main/java/com/frandorado/springbatchawsintegrationmaster/transformer


          https://github.com/frandorado/spring-projects/tree/master/spring-batch-aws-integration/spring-batch-aws-integration-slave/src/main/java/com/frandorado/springbatchawsintegrationslave/transformer

          posted on 2020-01-21 16:44 paulwong 閱讀(584) 評論(0)  編輯  收藏 所屬分類: SRPING BATCH

          主站蜘蛛池模板: 武清区| 马公市| 凌云县| 灌云县| 永康市| 阜康市| 墨脱县| 无为县| 清河县| 蓬安县| 阆中市| 霍山县| 泰安市| 永丰县| 双流县| 枣阳市| 大英县| 南岸区| 沂源县| 涟水县| 桦南县| 容城县| 格尔木市| 晋宁县| 桑植县| 新昌县| 兴海县| 鹤山市| 上蔡县| 西藏| 昌吉市| 安龙县| 神农架林区| 尉犁县| 怀远县| 赤水市| 石屏县| 澜沧| 绥芬河市| 贵德县| 平利县|