paulwong

          SPRING BATCH remote chunking模式下可同時處理多文件

          SPRING BATCH remote chunking模式下,如果要同一時間處理多個文件,按DEMO的默認配置,是會報錯的,這是由于多個文件的處理的MASTER方,是用同一個QUEUE名,這樣SLAVE中處理多個JOB INSTANCE時,會返回不同的JOB-INSTANCE-ID,導致報錯。

          這時需更改SPRING BATCH使用SPRING INTEGRATION的模式中的GATEWAY組件。

          GATEWAY組件是工作在REQUEST/RESPONSE模式下,即發一個MESSAGE到某一QUEUE時,要從REPLY QUEUE等到CONSUMER返回結果時,才往下繼續。

          OUTBOUND GATEWAY:從某一CHANNEL獲取MESSAGE,發往REQUEST QUEUE,從REPLY QUEUE等到CONSUMER返回結果,將此MESSAGE發往下一CHANNEL。

          INBOUND GATEWAY:從某一QUEUE獲取MESSAGE,發往某一REQUEST CHANNEL,從REPLY CHANNEL等到返回結果,將此MESSAGE發往下一QUEUE。

          詳情參見此文:https://blog.csdn.net/alexlau8/article/details/78056064

              <!-- Master jms -->
              <int:channel id="MasterRequestChannel">
                  <int:dispatcher task-executor="RequestPublishExecutor"/>
              </int:channel>
              <task:executor id="RequestPublishExecutor" pool-size="5-10" queue-capacity="0"/>
          <!--    <int-jms:outbound-channel-adapter 
                  connection-factory="connectionFactory" 
                  destination-name="RequestQueue" 
                  channel="MasterRequestChannel"/> 
          -->

              <int:channel id="MasterReplyChannel"/>
          <!--    <int-jms:message-driven-channel-adapter 
                  connection-factory="connectionFactory" 
                  destination-name="ReplyQueue"
                  channel="MasterReplyChannel"/> 
          -->

              <int-jms:outbound-gateway
                  
          connection-factory="connectionFactory"
                  correlation-key
          ="JMSCorrelationID"
                  request-channel
          ="MasterRequestChannel"
                  request-destination-name
          ="RequestQueue"
                  receive-timeout
          ="30000"
                  reply-channel
          ="MasterReplyChannel"
                  reply-destination-name
          ="ReplyQueue"
                  async
          ="true">
                  <int-jms:reply-listener />
              </int-jms:outbound-gateway>

              <!-- Slave jms -->
              <int:channel id="SlaveRequestChannel"/>
          <!--    <int-jms:message-driven-channel-adapter
                  connection-factory="connectionFactory" 
                  destination-name="RequestQueue"
                  channel="SlaveRequestChannel"/> 
          -->

              <int:channel id="SlaveReplyChannel"/>
          <!--    <int-jms:outbound-channel-adapter 
                  connection-factory="connectionFactory" 
                  destination-name="ReplyQueue"
                  channel="SlaveReplyChannel"/> 
          -->

              <int-jms:inbound-gateway
                  
          connection-factory="connectionFactory"
                  correlation-key
          ="JMSCorrelationID"
                  request-channel
          ="SlaveRequestChannel"
                  request-destination-name
          ="RequestQueue"
                  reply-channel
          ="SlaveReplyChannel"
                  default-reply-queue-name
          ="ReplyQueue"/>

          MASTER配置
          package com.paul.testspringbatch.config.master;

          import javax.jms.ConnectionFactory;

          import org.springframework.beans.factory.config.CustomScopeConfigurer;
          //import org.springframework.batch.core.configuration.annotation.StepScope;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.context.annotation.Profile;
          import org.springframework.context.annotation.Scope;
          import org.springframework.context.support.SimpleThreadScope;
          import org.springframework.integration.channel.DirectChannel;
          import org.springframework.integration.channel.QueueChannel;
          import org.springframework.integration.config.EnableIntegration;
          import org.springframework.integration.dsl.IntegrationFlow;
          import org.springframework.integration.dsl.IntegrationFlows;
          import org.springframework.integration.jms.JmsOutboundGateway;

          import com.paul.testspringbatch.common.constant.IntegrationConstant;

          @Configuration
          @EnableIntegration
          @Profile("batch-master")
          public class IntegrationMasterConfiguration {
              
          //    @Value("${broker.url}")
          //    private String brokerUrl;


          //    @Bean
          //    public ActiveMQConnectionFactory connectionFactory() {
          //        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
          //        connectionFactory.setBrokerURL(this.brokerUrl);
          //        connectionFactory.setTrustAllPackages(true);
          //        return connectionFactory;
          //    }

              /*
               * Configure outbound flow (requests going to workers)
               
          */
              @Bean
          //    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
              public DirectChannel requests() {
                  return new DirectChannel();
              }

          //    @Bean
          //    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
          //        return IntegrationFlows
          //                .from(requests())
          //                .handle(Jms.outboundAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REQUEST_DESTINATION))
          //                .get();
          //    }
              
               @Bean
               public CustomScopeConfigurer customScopeConfigurer() {
                   CustomScopeConfigurer customScopeConfigurer = new CustomScopeConfigurer();
                   customScopeConfigurer.addScope("thread", new SimpleThreadScope());
                   return customScopeConfigurer;
               }
               
          //     @Bean
          //     public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
          //         return new BeanFactoryPostProcessor() {
          //                
          //             @Override
          //             public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
          //                    beanFactory.registerScope("thread", new SimpleThreadScope());
          //                }
          //              };
          //     }
              
              /*
               * Configure inbound flow (replies coming from workers)
               
          */
              @Bean
              @Scope(value = "thread"/* , proxyMode = ScopedProxyMode.NO */)
          //    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
              public QueueChannel replies() {
                  return new QueueChannel();
              }

          //    @Bean
          //    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
          //        return IntegrationFlows
          //                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REPLY_DESTINATION))
          //                .channel(replies())
          //                .get();
          //    }

              @Bean
              public JmsOutboundGateway jmsOutboundGateway(ConnectionFactory connectionFactory) {
                  JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
                  jmsOutboundGateway.setConnectionFactory(connectionFactory);
                  jmsOutboundGateway.setRequestDestinationName(IntegrationConstant.MASTER_REQUEST_DESTINATION);//2. send the message to this destination
                  jmsOutboundGateway.setRequiresReply(true);
                  jmsOutboundGateway.setCorrelationKey(IntegrationConstant.JMS_CORRELATION_KEY);//3. let the broker filter the message
                  jmsOutboundGateway.setAsync(true);//must be async, so that JMS_CORRELATION_KEY work
                  jmsOutboundGateway.setUseReplyContainer(true);
                  jmsOutboundGateway.setReplyDestinationName(IntegrationConstant.MASTER_REPLY_DESTINATION);//4. waiting the response from this destination
                  jmsOutboundGateway.setReceiveTimeout(30_000);
                  return jmsOutboundGateway;
              }

              @Bean
              public IntegrationFlow jmsOutboundGatewayFlow(ConnectionFactory connectionFactory) {
                  return IntegrationFlows
                                  .from(requests())//1. receive message from this channel
                                  .handle(jmsOutboundGateway(connectionFactory))
                                  .channel(replies())//5. send back the response to this channel
                                  .get();
              }

          }


          SLAVE配置:
          package com.paul.testspringbatch.config.slave;

          import javax.jms.ConnectionFactory;

          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.context.annotation.Profile;
          import org.springframework.integration.channel.DirectChannel;
          import org.springframework.integration.config.EnableIntegration;
          import org.springframework.integration.dsl.IntegrationFlow;
          import org.springframework.integration.dsl.IntegrationFlows;
          import org.springframework.integration.jms.dsl.Jms;

          import com.paul.testspringbatch.common.constant.IntegrationConstant;

          @Configuration
          @EnableIntegration
          @Profile("batch-slave")
          public class IntegrationSlaveConfiguration {
              

              /*
               * Configure inbound flow (requests coming from the master)
               
          */
              @Bean
              public DirectChannel requests() {
                  return new DirectChannel();
              }

          //    @Bean
          //    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
          //        return IntegrationFlows
          //                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
          //                .channel(requests())
          //                .get();
          //    }

              /*
               * Configure outbound flow (replies going to the master)
               
          */
              @Bean
              public DirectChannel replies() {
                  return new DirectChannel();
              }

          //    @Bean
          //    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
          //        return IntegrationFlows
          //                .from(replies())
          //                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
          //                .get();
          //    }

              @Bean
              public IntegrationFlow inboundGatewayFlow(ConnectionFactory connectionFactory) {
                  return IntegrationFlows
                              .from(Jms
                                      .inboundGateway(connectionFactory)
                                      .destination(IntegrationConstant.SLAVE_HANDLE_MASTER_REQUEST_DESTINATION)//1. receive message from this channel.
                                      .correlationKey(IntegrationConstant.JMS_CORRELATION_KEY)//2. let the broker filter the message
                                      .requestChannel(requests())//3. send the message to this channel
                                      .replyChannel(replies())//4. waitting the result from this channel
                                      .defaultReplyQueueName(IntegrationConstant.SLAVE_RETURN_RESULT_DESTINATION)//5.send back the result to this destination to the master.
                                      )
                              .get();
              }

          }

          posted on 2019-07-16 14:38 paulwong 閱讀(850) 評論(0)  編輯  收藏 所屬分類: SRPING BATCH

          主站蜘蛛池模板: 原平市| 和静县| 海安县| 龙州县| 视频| 冷水江市| 平泉县| 辰溪县| 哈密市| 河间市| 吉林市| 定远县| 梨树县| 新河县| 海安县| 宜黄县| 大埔县| 无棣县| 揭东县| 武鸣县| 信丰县| 南阳市| 方正县| 沧州市| 钟山县| 四平市| 泸州市| 休宁县| 工布江达县| 杭州市| 保亭| 商水县| 稷山县| 平顶山市| 三原县| 三河市| 建阳市| 安陆市| 永年县| 大田县| 江北区|