paulwong

          SPRING INTEGRATION + SPRING BATCH

          SPRING INTEGRATION的強項是事件驅動,但捕獲之后,要做的事是觸發一個類的方法,對于要處理大數據量的文件,就沒有辦法了,如讀取1w條記錄,然后插入數據庫。而這個正是SPRING BATCH的強項所在,因此有必要將此兩個框架整合起來用。

          場景:盯著一個文件夾,如果一有文件,此文件可能非常大的,則啟動一個BATCH JOB來處理。


          文件拉取器,監控文件夾一有新文件,則將此文件包裝成MESSAGE,丟到下一個通道中:
          <file:inbound-channel-adapter id="filePoller"
                                        channel
          ="filesAreComing" 
                                        directory
          ="file:${input.directory}"
                                        filename-pattern
          ="test*" />


          filesAreComing通道的ServiceActivator
          public JobLaunchRequest adapt(File file) throws NoSuchJobException {

              JobParameters jobParameters = new JobParametersBuilder().addString(
                      "input.file", file.getAbsolutePath()).toJobParameters();

              return new JobLaunchRequest(job, jobParameters);
          }


          jobLauncher通道的ServiceActivator
          <service-activator input-channel="jobLauncher">
              <beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
                  <beans:constructor-arg ref="jobLauncher" />
              </beans:bean>
          </service-activator>


          "file.input"依賴于執行期的取值
          <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
              <property name="resource" value="#{jobParameters[input.file]}" />
               line mapper and other props
          </bean>


          參考的SPRING XML
          <?xml version="1.0" encoding="UTF-8"?>
          <beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns:beans
          ="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop"
              xmlns:file
          ="http://www.springframework.org/schema/integration/file"
              xsi:schemaLocation
          ="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
                  http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
                  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                  http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"
          >

              <channel id="fileChannel"/>
              <channel id="jobLaunchRequestChannel"/>
              <channel id="jobExecutionChannel"/>
              <logging-channel-adapter channel="jobExecutionChannel" />

              <file:inbound-channel-adapter directory="/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file"
                  channel
          ="fileChannel" filename-pattern="t*.xml" comparator="fileCreationTimeComparator">
                  <poller max-messages-per-poll="1" cron="0/1 * * * * *" />
              </file:inbound-channel-adapter>
              
              <service-activator input-channel="fileChannel"
                                 output-channel
          ="jobLaunchRequestChannel"
                                 ref
          ="fileToJobLaunchRequestAdapter"
                                 method
          ="adapt"/>
                                 
              <service-activator input-channel="jobLaunchRequestChannel" output-channel="jobExecutionChannel">
                  <beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
                      <beans:constructor-arg ref="jobLauncher" />
                  </beans:bean>
              </service-activator>

              <beans:bean id="fileToJobLaunchRequestAdapter" class="example.FileToJobLaunchRequestAdapter">
                  <beans:property name="job" ref="helloWorldJob"/>
              </beans:bean>
              
              
              <beans:bean id="fileCreationTimeComparator" class="com.paul.integration.file.filters.comparator.FileCreationTimeComparator">
              </beans:bean>

          </beans:beans>

          SPRING BATCH JOB的配置文件
          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
              xmlns:xsi
          ="http://www.w3.org/2001/XMLSchema-instance"
              xmlns:batch
          ="http://www.springframework.org/schema/batch"
              xmlns:util
          ="http://www.springframework.org/schema/util"
              xsi:schemaLocation
          ="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                  http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
                  http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd"
          >

              

              <batch:job id="helloWorldJob">
                  <batch:step id="step1" next="xmlFileReadAndWriterStep">
                      <batch:tasklet ref="helloWorldTasklet"></batch:tasklet>
                  </batch:step>
                  <batch:step id="xmlFileReadAndWriterStep">
                       <batch:tasklet>
                           <batch:chunk reader="xmlReader" writer="xmlWriter" processor="xmlProcessor"
                               commit-interval
          ="10">
                           </batch:chunk>
                       </batch:tasklet>
                   </batch:step>
              </batch:job>
              
              <bean id="helloWorldTasklet" class="example.HelloWorldTasklet"></bean>
              
              <!-- XML文件讀取 -->
               <bean id="xmlReader"
                   class
          ="org.springframework.batch.item.xml.StaxEventItemReader" scope="step">
                   <property name="fragmentRootElementName" value="trade" />
                   <property name="unmarshaller" ref="tradeMarshaller" />
                   <property name="resource" value="#{jobParameters['input.file.path']}" />
               </bean>
           
               <bean id="xmlProcessor" class="com.paul.batch.XMLProcessor" />
           
               <!-- XML文件寫入 -->
              <bean id="xmlWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter"
                  scope
          ="step">
                  <property name="rootTagName" value="wanggc" />
                  <property name="marshaller" ref="tradeMarshaller" />
                  <property name="resource" value="#{jobParameters['output.file.path']}" />
              </bean>

              <bean id="tradeMarshaller" class="org.springframework.oxm.xstream.XStreamMarshaller">
                  <property name="aliases">
                      <util:map id="aliases">
                          <entry key="trade" value="com.paul.domain.Trade" />
                          <entry key="price" value="java.math.BigDecimal" />
                          <entry key="name" value="java.lang.String" />
                      </util:map>
                  </property>
              </bean>
              
          </beans>


          文件處理器
          package com.paul.batch;

          import org.springframework.batch.item.ItemProcessor;

          import com.paul.domain.Trade;

          public class XMLProcessor implements ItemProcessor<Trade, Trade> {

              /**
               * XML文件內容處理。
               * 
               
          */
              @Override
              public Trade process(Trade trade) throws Exception {
                  return trade;
              }
          }


          DOMAIN TRADE對象
          /*
           * Copyright 2006-2007 the original author or authors.
           *
           * Licensed under the Apache License, Version 2.0 (the "License");
           * you may not use this file except in compliance with the License.
           * You may obtain a copy of the License at
           *
           *      
          http://www.apache.org/licenses/LICENSE-2.0
           *
           * Unless required by applicable law or agreed to in writing, software
           * distributed under the License is distributed on an "AS IS" BASIS,
           * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
           * See the License for the specific language governing permissions and
           * limitations under the License.
           
          */

          package com.paul.domain;

          import java.io.Serializable;
          import java.math.BigDecimal;


          /**
           * 
          @author Rob Harrop
           * 
          @author Dave Syer
           
          */
          public class Trade implements Serializable {
              private String isin = "";
              private long quantity = 0;
              private BigDecimal price = new BigDecimal(0);
              private String customer = "";
              private Long id;
              private long version = 0;

              public Trade() {
              }
              
              public Trade(String isin, long quantity, BigDecimal price, String customer){
                  this.isin = isin;
                  this.quantity = quantity;
                  this.price = price;
                  this.customer = customer;
              }

              /**
               * 
          @param id
               
          */
              public Trade(long id) {
                  this.id = id;
              }
              
              public long getId() {
                  return id;
              }
              
              public void setId(long id) {
                  this.id = id;
              }

              public long getVersion() {
                  return version;
              }

              public void setVersion(long version) {
                  this.version = version;
              }

              public void setCustomer(String customer) {
                  this.customer = customer;
              }

              public void setIsin(String isin) {
                  this.isin = isin;
              }

              public void setPrice(BigDecimal price) {
                  this.price = price;
              }

              public void setQuantity(long quantity) {
                  this.quantity = quantity;
              }

              public String getIsin() {
                  return isin;
              }

              public BigDecimal getPrice() {
                  return price;
              }

              public long getQuantity() {
                  return quantity;
              }

              public String getCustomer() {
                  return customer;
              }

              public String toString() {
                  return "Trade: [isin=" + this.isin + ",quantity=" + this.quantity + ",price="
                      + this.price + ",customer=" + this.customer + "]";
              }

              @Override
              public int hashCode() {
                  final int prime = 31;
                  int result = 1;
                  result = prime * result + ((customer == null) ? 0 : customer.hashCode());
                  result = prime * result + ((isin == null) ? 0 : isin.hashCode());
                  result = prime * result + ((price == null) ? 0 : price.hashCode());
                  result = prime * result + (int) (quantity ^ (quantity >>> 32));
                  result = prime * result + (int) (version ^ (version >>> 32));
                  return result;
              }

              @Override
              public boolean equals(Object obj) {
                  if (this == obj)
                      return true;
                  if (obj == null)
                      return false;
                  if (getClass() != obj.getClass())
                      return false;
                  Trade other = (Trade) obj;
                  if (customer == null) {
                      if (other.customer != null)
                          return false;
                  }
                  else if (!customer.equals(other.customer))
                      return false;
                  if (isin == null) {
                      if (other.isin != null)
                          return false;
                  }
                  else if (!isin.equals(other.isin))
                      return false;
                  if (price == null) {
                      if (other.price != null)
                          return false;
                  }
                  else if (!price.equals(other.price))
                      return false;
                  if (quantity != other.quantity)
                      return false;
                  if (version != other.version)
                      return false;
                  return true;
              }
              
           }


          從文件夾取出文件列表后,進行按修改時間排序的排序器
          package com.paul.integration.file.filters.comparator;

          import java.io.File;
          import java.util.Comparator;

          public class FileCreationTimeComparator implements Comparator<File>{

              @Override
              public int compare(File file1, File file2) {
                  return Long.valueOf(file2.lastModified()).compareTo(
                          Long.valueOf(file1.lastModified()));
          //        return file1.getName().compareToIgnoreCase(file2.getName());
              }
              
          }


          封裝了JOB和JOBPARAMETERS的HOLDER類

          package example;

          import java.io.File;

          import org.springframework.batch.core.Job;
          import org.springframework.batch.core.JobParameters;
          import org.springframework.batch.core.JobParametersBuilder;
          import org.springframework.batch.core.launch.NoSuchJobException;
          import org.springframework.batch.integration.launch.JobLaunchRequest;
          import org.springframework.beans.factory.InitializingBean;
          import org.springframework.integration.annotation.MessageEndpoint;
          import org.springframework.integration.annotation.ServiceActivator;
          import org.springframework.util.Assert;

          /**
           * Adapt a {
          @link File} to a {@link JobLaunchRequest} with a job parameter
           * <code>input.file</code> equal to the path of the file.
           * 
           * 
          @author Dave Syer
           * 
           
          */
          @MessageEndpoint
          public class FileToJobLaunchRequestAdapter implements InitializingBean {

              private Job job;

              public void setJob(Job job) {
                  this.job = job;
              }

              public void afterPropertiesSet() throws Exception {
                  Assert.notNull(job, "A Job must be provided");
              }

              @ServiceActivator
              public JobLaunchRequest adapt(File file) throws NoSuchJobException {

                  String fileName = file.getAbsolutePath();

                  if (!fileName.startsWith("/")) {
                      fileName = "/" + fileName;
                  }

                  fileName = "file://" + fileName;
                  
                  String outPutFilePath = "file:/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/" +
                          "spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file/output/out.xml";

                  JobParameters jobParameters = new JobParametersBuilder().
                          addString("input.file.path", fileName).
                          addString("output.file.path", outPutFilePath).
                          addLong("time.stamp", System.currentTimeMillis()).
                          toJobParameters();

                  if (job.getJobParametersIncrementer() != null) {
                      jobParameters = job.getJobParametersIncrementer().getNext(jobParameters);
                  }

                  return new JobLaunchRequest(job, jobParameters);

              }

          }


          TRADE測試數據文件
          <?xml version="1.0" encoding="UTF-8"?>
          <records>
              <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
                  <isin>XYZ0001</isin>
                  <quantity>5</quantity>
                  <price>11.39</price>
                  <customer>Customer1</customer>
              </trade>
              <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
                  <isin>XYZ0002</isin>
                  <quantity>2</quantity>
                  <price>72.99</price>
                  <customer>Customer2c</customer>
              </trade>
              <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
                  <isin>XYZ0003</isin>
                  <quantity>9</quantity>
                  <price>99.99</price>
                  <customer>Customer3</customer>
              </trade>
          </records>

          MAVEN中的JAR DEPENDENCY:
          <dependency>
              <groupId>org.springframework.integration</groupId>
              <artifactId>spring-integration-core</artifactId>
              <version>2.0.3.RELEASE</version>
          </dependency>
          <dependency>
              <groupId>org.springframework.batch</groupId>
              <artifactId>spring-batch-integration</artifactId>
              <version>1.2.1.RELEASE</version>
          </dependency>
          <dependency>
              <groupId>org.springframework.integration</groupId>
              <artifactId>spring-integration-file</artifactId>
              <version>2.0.3.RELEASE</version>
          </dependency>

          至此全部通道已打通。


          參考:
          http://stackoverflow.com/questions/7099543/launching-spring-batch-job

          http://stackoverflow.com/questions/11758672/spring-batch-flatfileitemreader-read-multiple-files

          https://github.com/SpringSource/spring-batch-admin/blob/master/spring-batch-admin-manager/src/main/java/org/springframework/batch/admin/integration/FileToJobLaunchRequestAdapter.java

          http://blog.springsource.org/2010/02/15/practical-use-of-spring-batch-and-spring-integration/

          http://www.enterpriseintegrationpatterns.com/ramblings/18_starbucks.html

          http://static.springsource.org/spring-integration/docs/2.0.3.RELEASE/reference/html/jdbc.html

          posted on 2012-10-16 00:11 paulwong 閱讀(5473) 評論(7)  編輯  收藏 所屬分類: SPRING INTERGRATIONSRPING BATCH

          Feedback

          # re: SPRING INTEGRATION + SPRING BATCH 2012-11-16 16:17 RoJeff

          你好:現在有個問題請求幫助。
          需求:數據源來自多個目錄下的文件,而且文件的格式不一樣。該怎么解決?跪求結果,謝謝了

          現在讀取多個目錄下的文件的問題已經解決了,我是通過配置多個輸入channel,一個輸出 channel。就是對多個目錄進行監控。我是沒有辦法,才這樣。因為我需要監控多個目錄。

          還個難題就是多個文件進來了,格式不一樣,怎么樣去調用不同的分割器,分割文件。或者說調用不到的讀取器

          以下是我的applicationContext-integration.xml配置

          <channel id="videoPlayerFileChannel" />
          <channel id="videoPlayerJobLaunchRequestChannel" />
          <channel id="videoPlayerJobExecutionChannel" />
          <logging-channel-adapter channel="videoPlayerJobExecutionChannel" />

          <file:inbound-channel-adapter id="videoPlayerInboundChannelAdapter"
          directory="${config.1000000022.SourcePath}" auto-create-directory="true"
          auto-startup="true" channel="videoPlayerFileChannel" filename-pattern="${config.1000000022.SourceType}"
          comparator="fileComparator">
          <poller max-messages-per-poll="1" cron="0/1 * * * * *" />
          </file:inbound-channel-adapter>

          <service-activator input-channel="videoPlayerFileChannel"
          output-channel="videoPlayerJobLaunchRequestChannel" ref="launcher"
          method="adapt" />

          <service-activator input-channel="videoPlayerJobLaunchRequestChannel"
          output-channel="videoPlayerJobExecutionChannel">
          <beans:bean
          class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
          <beans:constructor-arg ref="jobLauncher" />
          </beans:bean>
          </service-activator>

          <beans:bean id="launcher" class="com.zjcy.cbs.pretreatment.batch.Launcher">
          <beans:property name="job" ref="pretreatJob" />
          </beans:bean>


          <beans:bean id="fileComparator"
          class="com.zjcy.cbs.pretreatment.batch.FileComparator">
          </beans:bean>


          <channel id="fileChannelMessage" />
          <channel id="messageJobLaunchRequestChannel" />
          <channel id="messageJobExecutionChannel" />
          <logging-channel-adapter channel="messageJobExecutionChannel" />


          <file:inbound-channel-adapter id="fileChannelMessage1"
          directory="${config.2000000001.SourcePath}" auto-create-directory="true" auto-startup="true"
          channel="fileChannelMessage" filename-pattern="*${config.2000000001.SourceType}" comparator="fileComparator">
          <poller max-messages-per-poll="1" cron="0/1 * * * * *" />
          </file:inbound-channel-adapter>

          <service-activator input-channel="fileChannelMessage"
          output-channel="videoPlayerJobLaunchRequestChannel" ref="launcher"
          method="adapt" />  回復  更多評論   

          # re: SPRING INTEGRATION + SPRING BATCH 2012-11-17 22:39 paulwong

          @RoJeff
          可以在videoPlayerFileChannel中取得文件后綴名,加一個選擇判斷器之類的東西,按后綴名分派到不同的CHANNEL中。  回復  更多評論   

          # re: SPRING INTEGRATION + SPRING BATCH[未登錄] 2012-11-18 09:44 RoJeff

          @paulwong
          非常謝謝paulwong的幫助。
            回復  更多評論   

          # re: SPRING INTEGRATION + SPRING BATCH 2012-11-18 09:44 RoJeff

          @paulwong
          非常謝謝paulwong的幫助。  回復  更多評論   

          # re: SPRING INTEGRATION + SPRING BATCH 2012-11-19 17:25 RoJeff

          @paulwong
          請教問題:
          需求:file:inbound-channel-adapter 能配置多個目錄嗎,或者說可以包括子目錄嗎?如果可以,該怎么配置?

          <file:inbound-channel-adapter directory="/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file"
          channel="fileChannel" filename-pattern="t*.xml" comparator="fileCreationTimeComparator">
          <poller max-messages-per-poll="1" cron="0/1 * * * * *" />
          </file:inbound-channel-adapter>  回復  更多評論   

          # re: SPRING INTEGRATION + SPRING BATCH 2012-11-19 22:17 paulwong

          @RoJeff
          <bean name="scanner" class="org.springframework.integration.file.RecursiveLeafOnlyDirectoryScanner"/>

          <file:inbound-channel-adapter
          id="inputChannel" scanner="scanner" directory="/some/folder">  回復  更多評論   

          # re: SPRING INTEGRATION + SPRING BATCH 2012-11-19 22:47 RoJeff

          @paulwong
          大牛就是大牛啊,這個問題得到解決,非常感覺。  回復  更多評論   


          主站蜘蛛池模板: 深圳市| 宜州市| 阿克苏市| 应城市| 澄江县| 巴彦县| 扶绥县| 兰西县| 盐源县| 乐东| 嘉定区| 丹棱县| 镇安县| 武乡县| 伊春市| 泰顺县| 滁州市| 长岛县| 密山市| 赞皇县| 屯留县| 那曲县| 田东县| 长顺县| 渝中区| 通江县| 连城县| 库尔勒市| 六枝特区| 桂阳县| 孝义市| 玛曲县| 无棣县| 清流县| 叙永县| 巩留县| 宣化县| 焦作市| 洛隆县| 聂拉木县| 广西|