paulwong

          My Links

          Blog Stats

          常用鏈接

          留言簿(67)

          隨筆分類(1393)

          隨筆檔案(1151)

          文章分類(7)

          文章檔案(10)

          相冊

          收藏夾(2)

          AI

          Develop

          E-BOOK

          Other

          養生

          微服務

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          60天內閱讀排行

          SPRING BATCH remote chunking模式下可同時處理多文件

          SPRING BATCH remote chunking模式下,如果要同一時間處理多個文件,按DEMO的默認配置,是會報錯的,這是由于多個文件的處理的MASTER方,是用同一個QUEUE名,這樣SLAVE中處理多個JOB INSTANCE時,會返回不同的JOB-INSTANCE-ID,導致報錯。

          這時需更改SPRING BATCH使用SPRING INTEGRATION的模式中的GATEWAY組件。

          GATEWAY組件是工作在REQUEST/RESPONSE模式下,即發一個MESSAGE到某一QUEUE時,要從REPLY QUEUE等到CONSUMER返回結果時,才往下繼續。

          OUTBOUND GATEWAY:從某一CHANNEL獲取MESSAGE,發往REQUEST QUEUE,從REPLY QUEUE等到CONSUMER返回結果,將此MESSAGE發往下一CHANNEL。

          INBOUND GATEWAY:從某一QUEUE獲取MESSAGE,發往某一REQUEST CHANNEL,從REPLY CHANNEL等到返回結果,將此MESSAGE發往下一QUEUE。

          詳情參見此文:https://blog.csdn.net/alexlau8/article/details/78056064

              <!-- Master jms -->
              <int:channel id="MasterRequestChannel">
                  <int:dispatcher task-executor="RequestPublishExecutor"/>
              </int:channel>
              <task:executor id="RequestPublishExecutor" pool-size="5-10" queue-capacity="0"/>
          <!--    <int-jms:outbound-channel-adapter 
                  connection-factory="connectionFactory" 
                  destination-name="RequestQueue" 
                  channel="MasterRequestChannel"/> 
          -->

              <int:channel id="MasterReplyChannel"/>
          <!--    <int-jms:message-driven-channel-adapter 
                  connection-factory="connectionFactory" 
                  destination-name="ReplyQueue"
                  channel="MasterReplyChannel"/> 
          -->

              <int-jms:outbound-gateway
                  
          connection-factory="connectionFactory"
                  correlation-key
          ="JMSCorrelationID"
                  request-channel
          ="MasterRequestChannel"
                  request-destination-name
          ="RequestQueue"
                  receive-timeout
          ="30000"
                  reply-channel
          ="MasterReplyChannel"
                  reply-destination-name
          ="ReplyQueue"
                  async
          ="true">
                  <int-jms:reply-listener />
              </int-jms:outbound-gateway>

              <!-- Slave jms -->
              <int:channel id="SlaveRequestChannel"/>
          <!--    <int-jms:message-driven-channel-adapter
                  connection-factory="connectionFactory" 
                  destination-name="RequestQueue"
                  channel="SlaveRequestChannel"/> 
          -->

              <int:channel id="SlaveReplyChannel"/>
          <!--    <int-jms:outbound-channel-adapter 
                  connection-factory="connectionFactory" 
                  destination-name="ReplyQueue"
                  channel="SlaveReplyChannel"/> 
          -->

              <int-jms:inbound-gateway
                  
          connection-factory="connectionFactory"
                  correlation-key
          ="JMSCorrelationID"
                  request-channel
          ="SlaveRequestChannel"
                  request-destination-name
          ="RequestQueue"
                  reply-channel
          ="SlaveReplyChannel"
                  default-reply-queue-name
          ="ReplyQueue"/>

          MASTER配置
          package com.paul.testspringbatch.config.master;

          import javax.jms.ConnectionFactory;

          import org.springframework.beans.factory.config.CustomScopeConfigurer;
          //import org.springframework.batch.core.configuration.annotation.StepScope;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.context.annotation.Profile;
          import org.springframework.context.annotation.Scope;
          import org.springframework.context.support.SimpleThreadScope;
          import org.springframework.integration.channel.DirectChannel;
          import org.springframework.integration.channel.QueueChannel;
          import org.springframework.integration.config.EnableIntegration;
          import org.springframework.integration.dsl.IntegrationFlow;
          import org.springframework.integration.dsl.IntegrationFlows;
          import org.springframework.integration.jms.JmsOutboundGateway;

          import com.paul.testspringbatch.common.constant.IntegrationConstant;

          @Configuration
          @EnableIntegration
          @Profile("batch-master")
          public class IntegrationMasterConfiguration {
              
          //    @Value("${broker.url}")
          //    private String brokerUrl;


          //    @Bean
          //    public ActiveMQConnectionFactory connectionFactory() {
          //        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
          //        connectionFactory.setBrokerURL(this.brokerUrl);
          //        connectionFactory.setTrustAllPackages(true);
          //        return connectionFactory;
          //    }

              /*
               * Configure outbound flow (requests going to workers)
               
          */
              @Bean
          //    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
              public DirectChannel requests() {
                  return new DirectChannel();
              }

          //    @Bean
          //    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
          //        return IntegrationFlows
          //                .from(requests())
          //                .handle(Jms.outboundAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REQUEST_DESTINATION))
          //                .get();
          //    }
              
               @Bean
               public CustomScopeConfigurer customScopeConfigurer() {
                   CustomScopeConfigurer customScopeConfigurer = new CustomScopeConfigurer();
                   customScopeConfigurer.addScope("thread", new SimpleThreadScope());
                   return customScopeConfigurer;
               }
               
          //     @Bean
          //     public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
          //         return new BeanFactoryPostProcessor() {
          //                
          //             @Override
          //             public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
          //                    beanFactory.registerScope("thread", new SimpleThreadScope());
          //                }
          //              };
          //     }
              
              /*
               * Configure inbound flow (replies coming from workers)
               
          */
              @Bean
              @Scope(value = "thread"/* , proxyMode = ScopedProxyMode.NO */)
          //    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
              public QueueChannel replies() {
                  return new QueueChannel();
              }

          //    @Bean
          //    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
          //        return IntegrationFlows
          //                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REPLY_DESTINATION))
          //                .channel(replies())
          //                .get();
          //    }

              @Bean
              public JmsOutboundGateway jmsOutboundGateway(ConnectionFactory connectionFactory) {
                  JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
                  jmsOutboundGateway.setConnectionFactory(connectionFactory);
                  jmsOutboundGateway.setRequestDestinationName(IntegrationConstant.MASTER_REQUEST_DESTINATION);//2. send the message to this destination
                  jmsOutboundGateway.setRequiresReply(true);
                  jmsOutboundGateway.setCorrelationKey(IntegrationConstant.JMS_CORRELATION_KEY);//3. let the broker filter the message
                  jmsOutboundGateway.setAsync(true);//must be async, so that JMS_CORRELATION_KEY work
                  jmsOutboundGateway.setUseReplyContainer(true);
                  jmsOutboundGateway.setReplyDestinationName(IntegrationConstant.MASTER_REPLY_DESTINATION);//4. waiting the response from this destination
                  jmsOutboundGateway.setReceiveTimeout(30_000);
                  return jmsOutboundGateway;
              }

              @Bean
              public IntegrationFlow jmsOutboundGatewayFlow(ConnectionFactory connectionFactory) {
                  return IntegrationFlows
                                  .from(requests())//1. receive message from this channel
                                  .handle(jmsOutboundGateway(connectionFactory))
                                  .channel(replies())//5. send back the response to this channel
                                  .get();
              }

          }


          SLAVE配置:
          package com.paul.testspringbatch.config.slave;

          import javax.jms.ConnectionFactory;

          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.context.annotation.Profile;
          import org.springframework.integration.channel.DirectChannel;
          import org.springframework.integration.config.EnableIntegration;
          import org.springframework.integration.dsl.IntegrationFlow;
          import org.springframework.integration.dsl.IntegrationFlows;
          import org.springframework.integration.jms.dsl.Jms;

          import com.paul.testspringbatch.common.constant.IntegrationConstant;

          @Configuration
          @EnableIntegration
          @Profile("batch-slave")
          public class IntegrationSlaveConfiguration {
              

              /*
               * Configure inbound flow (requests coming from the master)
               
          */
              @Bean
              public DirectChannel requests() {
                  return new DirectChannel();
              }

          //    @Bean
          //    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
          //        return IntegrationFlows
          //                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
          //                .channel(requests())
          //                .get();
          //    }

              /*
               * Configure outbound flow (replies going to the master)
               
          */
              @Bean
              public DirectChannel replies() {
                  return new DirectChannel();
              }

          //    @Bean
          //    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
          //        return IntegrationFlows
          //                .from(replies())
          //                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
          //                .get();
          //    }

              @Bean
              public IntegrationFlow inboundGatewayFlow(ConnectionFactory connectionFactory) {
                  return IntegrationFlows
                              .from(Jms
                                      .inboundGateway(connectionFactory)
                                      .destination(IntegrationConstant.SLAVE_HANDLE_MASTER_REQUEST_DESTINATION)//1. receive message from this channel.
                                      .correlationKey(IntegrationConstant.JMS_CORRELATION_KEY)//2. let the broker filter the message
                                      .requestChannel(requests())//3. send the message to this channel
                                      .replyChannel(replies())//4. waitting the result from this channel
                                      .defaultReplyQueueName(IntegrationConstant.SLAVE_RETURN_RESULT_DESTINATION)//5.send back the result to this destination to the master.
                                      )
                              .get();
              }

          }

          posted on 2019-07-16 14:38 paulwong 閱讀(850) 評論(0)  編輯  收藏 所屬分類: SRPING BATCH

          主站蜘蛛池模板: 澄江县| 吉木萨尔县| 江山市| 乳山市| 兰考县| 哈密市| 林周县| 昌黎县| 山西省| 奉化市| 凤凰县| 姚安县| 华池县| 白水县| 开江县| 宝丰县| 宿松县| 和平区| 丰台区| 双峰县| 铁力市| 阳高县| 湖北省| 安庆市| 都兰县| 福清市| 凤冈县| 扎兰屯市| 海南省| 黄浦区| 临桂县| 宝鸡市| 招远市| 宾阳县| 浪卡子县| 沙河市| 武安市| 游戏| 互助| 长阳| 抚远县|