小賣部有一個訂飲料服務,客戶可以通過訂單來訂購所需要飲料。小賣部提供兩種咖啡飲料
LATTE(拿鐵咖啡)和MOCHA(摩卡咖啡)。每種又都分冷飲和熱飲
整個流程如下:
1.有一個下訂單模塊,用戶可以按要求下一個或多個訂單。
2.有一個訂單處理模塊,處理訂單中那些是關于訂購飲料的。
3.有一個飲料訂購處理模塊,處理拆分訂購的具體是那些種類的飲料,把具體需要生產的飲料要求發給生產模塊
4.有一個生產模塊

這個例子利用Spring Integration實現了靈活的,可配置化的模式集成了上述這些服務模塊。
先來看一下配置文件
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd">
<!-- 啟動Message bus 消息服務總線 支持四個屬性
auto-startup[boolean是否自動啟動 default=true]如果設置false,則需要手動調用applicationContext.start()方法
auto-create-channels[boolean是否自動注冊MessageChannel default=false],如果使用的MessagChannle不存在
error-channel 設置錯誤時信息發送的MessageChannle,如果不設置,則使用DefaultErrorChannel
dispatcher-pool-size 使用的啟動線程數,默認為10-->
<message-bus/>
<!-- 啟動支持元數據標記 -->
<annotation-driven/>
<!-- 設置 @Component標識的元數據掃描包(package) -->
<context:component-scan base-package="org.springframework.integration.samples.cafe"/>
<!-- 下面啟動了四個 MessageChannel服務 處理接收發送端發過來的消息和把消息流轉到消息的消費端 -->
<!-- 屬性說明: capacity 消息最大容量默認為100 publish-subscribe是否是發布訂閱模式,默認為否
id bean的id名稱 datatype ? -->
<channel id="orders"/> <!-- 訂單Channel -->
<channel id="drinks"/> <!-- 飲料訂單Channel,處理飲料的類別 -->
<channel id="coldDrinks"/> <!-- 冷飲生產Channel -->
<channel id="hotDrinks"/> <!-- 熱飲生產Channel -->
<!-- 消息處理終端 接收 channel coldDrinks的消息后,執行barista.prepareColdDrink方法 生產冷飲 -->
<!-- 屬性說明: input-channel 接收消息的Channel必須 default-output-channel設置默認回復消息Channel
handler-ref 引用bean的id名稱 handler-method Handler處理方法名(參數類型必須與發送消息的payLoad使用的一致)
error-handler設置錯誤時信息發送的MessageChannle reply-handler 消息回復的Channel -->
<endpoint input-channel="coldDrinks" handler-ref="barista"
handler-method="prepareColdDrink"/>
<!-- 消息處理終端 接收 channel hotDrinks的消息后,執行barista.prepareHotDrink方法 生產熱飲 -->
<endpoint input-channel="hotDrinks" handler-ref="barista"
handler-method="prepareHotDrink"/>
<!-- 定義一個啟動下定單操作的bean,它通過 channel orders下定單 -->
<beans:bean id="cafe" class="org.springframework.integration.samples.cafe.Cafe">
<beans:property name="orderChannel" ref="orders"/>
</beans:bean>
</beans:beans>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd">
<!-- 啟動Message bus 消息服務總線 支持四個屬性
auto-startup[boolean是否自動啟動 default=true]如果設置false,則需要手動調用applicationContext.start()方法
auto-create-channels[boolean是否自動注冊MessageChannel default=false],如果使用的MessagChannle不存在
error-channel 設置錯誤時信息發送的MessageChannle,如果不設置,則使用DefaultErrorChannel
dispatcher-pool-size 使用的啟動線程數,默認為10-->
<message-bus/>
<!-- 啟動支持元數據標記 -->
<annotation-driven/>
<!-- 設置 @Component標識的元數據掃描包(package) -->
<context:component-scan base-package="org.springframework.integration.samples.cafe"/>
<!-- 下面啟動了四個 MessageChannel服務 處理接收發送端發過來的消息和把消息流轉到消息的消費端 -->
<!-- 屬性說明: capacity 消息最大容量默認為100 publish-subscribe是否是發布訂閱模式,默認為否
id bean的id名稱 datatype ? -->
<channel id="orders"/> <!-- 訂單Channel -->
<channel id="drinks"/> <!-- 飲料訂單Channel,處理飲料的類別 -->
<channel id="coldDrinks"/> <!-- 冷飲生產Channel -->
<channel id="hotDrinks"/> <!-- 熱飲生產Channel -->
<!-- 消息處理終端 接收 channel coldDrinks的消息后,執行barista.prepareColdDrink方法 生產冷飲 -->
<!-- 屬性說明: input-channel 接收消息的Channel必須 default-output-channel設置默認回復消息Channel
handler-ref 引用bean的id名稱 handler-method Handler處理方法名(參數類型必須與發送消息的payLoad使用的一致)
error-handler設置錯誤時信息發送的MessageChannle reply-handler 消息回復的Channel -->
<endpoint input-channel="coldDrinks" handler-ref="barista"
handler-method="prepareColdDrink"/>
<!-- 消息處理終端 接收 channel hotDrinks的消息后,執行barista.prepareHotDrink方法 生產熱飲 -->
<endpoint input-channel="hotDrinks" handler-ref="barista"
handler-method="prepareHotDrink"/>
<!-- 定義一個啟動下定單操作的bean,它通過 channel orders下定單 -->
<beans:bean id="cafe" class="org.springframework.integration.samples.cafe.Cafe">
<beans:property name="orderChannel" ref="orders"/>
</beans:bean>
</beans:beans>
下面我們來看一下源代碼目錄:
我們來看一下整體服務是怎么啟動的
首先我們來看一下CafeDemo這個類,它觸發下定單操作、
1 public class CafeDemo {
2
3 public static void main(String[] args) {
4 //加載Spring 配置文件
5 AbstractApplicationContext context = null;
6 if(args.length > 0) {
7 context = new FileSystemXmlApplicationContext(args);
8 }
9 else {
10 context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
11 }
12 //啟動 Spring容器(啟動所有實現 org.springframework.context.Lifecycle接口的實現類的start方法)
13 context.start();
14 //從Spring容器 取得cafe實例
15 Cafe cafe = (Cafe) context.getBean("cafe");
16 DrinkOrder order = new DrinkOrder();
17 //一杯熱飲 參數說明1.飲料類型 2.數量 3.是否是冷飲(true表示冷飲)
18 Drink hotDoubleLatte = new Drink(DrinkType.LATTE, 2, false);
19 Drink icedTripleMocha = new Drink(DrinkType.MOCHA, 3, true);
20 order.addDrink(hotDoubleLatte);
21 order.addDrink(icedTripleMocha);
22 //下100個訂單
23 for (int i = 0; i < 100; i++) {
24 //調用cafe的placeOrder下訂單
25 cafe.placeOrder(order);
26 }
27 }
28 }
2
3 public static void main(String[] args) {
4 //加載Spring 配置文件
5 AbstractApplicationContext context = null;
6 if(args.length > 0) {
7 context = new FileSystemXmlApplicationContext(args);
8 }
9 else {
10 context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
11 }
12 //啟動 Spring容器(啟動所有實現 org.springframework.context.Lifecycle接口的實現類的start方法)
13 context.start();
14 //從Spring容器 取得cafe實例
15 Cafe cafe = (Cafe) context.getBean("cafe");
16 DrinkOrder order = new DrinkOrder();
17 //一杯熱飲 參數說明1.飲料類型 2.數量 3.是否是冷飲(true表示冷飲)
18 Drink hotDoubleLatte = new Drink(DrinkType.LATTE, 2, false);
19 Drink icedTripleMocha = new Drink(DrinkType.MOCHA, 3, true);
20 order.addDrink(hotDoubleLatte);
21 order.addDrink(icedTripleMocha);
22 //下100個訂單
23 for (int i = 0; i < 100; i++) {
24 //調用cafe的placeOrder下訂單
25 cafe.placeOrder(order);
26 }
27 }
28 }
下面是Cafe的源代碼
1 public class Cafe {
2
3 private MessageChannel orderChannel;
4
5
6 public void setOrderChannel(MessageChannel orderChannel) {
7 this.orderChannel = orderChannel;
8 }
9
10 //其實下訂單操作,調用的是orderChannel(orders channel)的send方法,把消息發出去
11 public void placeOrder(DrinkOrder order) {
12 this.orderChannel.send(new GenericMessage<DrinkOrder>(order));
13 //GenericMessage有三個構建方法,參考如下
14 //new GenericMessage<T>(Object id, T payload);
15 //new GenericMessage<T>(T payload);
16 //new GenericMessage<T>(T payload, MessageHeader headerToCopy)
17 }
18 }
2
3 private MessageChannel orderChannel;
4
5
6 public void setOrderChannel(MessageChannel orderChannel) {
7 this.orderChannel = orderChannel;
8 }
9
10 //其實下訂單操作,調用的是orderChannel(orders channel)的send方法,把消息發出去
11 public void placeOrder(DrinkOrder order) {
12 this.orderChannel.send(new GenericMessage<DrinkOrder>(order));
13 //GenericMessage有三個構建方法,參考如下
14 //new GenericMessage<T>(Object id, T payload);
15 //new GenericMessage<T>(T payload);
16 //new GenericMessage<T>(T payload, MessageHeader headerToCopy)
17 }
18 }
下面我們來看一下哪個類標記有@MessageEndpoint(input="orders") 表示它會消費orders Channel的消息
我們發現OrderSplitter類標記這個元數據,下面是源代碼,我們來分析
1 //標記 MessageEndpoint 元數據, input表示 設置后所有 orders Channel消息都會被OrderSplitter收到
2 @MessageEndpoint(input="orders")
3 public class OrderSplitter {
4
5 //@Splitter表示,接收消息后,調用這個類的該方法. 其的參數類型必須與message的 payload屬性一致。
6 //即在new GenericMessage<T>的泛型中指定
7 //元數據設置的 channel屬性表示,方法執行完成后,會把方法返回的結果保存到message的payload屬性后,發送到指定的channel中去
8 //這里指定發送到 drinks channel
9 @Splitter(channel="drinks")
10 public List<Drink> split(DrinkOrder order) {
11 return order.getDrinks(); //方法中,是把訂單中的飲料訂單取出來
12 }
13 }
2 @MessageEndpoint(input="orders")
3 public class OrderSplitter {
4
5 //@Splitter表示,接收消息后,調用這個類的該方法. 其的參數類型必須與message的 payload屬性一致。
6 //即在new GenericMessage<T>的泛型中指定
7 //元數據設置的 channel屬性表示,方法執行完成后,會把方法返回的結果保存到message的payload屬性后,發送到指定的channel中去
8 //這里指定發送到 drinks channel
9 @Splitter(channel="drinks")
10 public List<Drink> split(DrinkOrder order) {
11 return order.getDrinks(); //方法中,是把訂單中的飲料訂單取出來
12 }
13 }
接下來,與找OrderSplitter方法相同,我們要找哪個類標記有@MessageEndpoint(input="drinks") 表示它會消費drinks Channel的消息
找到DrinkRouter這個類
1 @MessageEndpoint(input="drinks")
2 public class DrinkRouter {
3
4 //@Router表示,接收消息后,調用這個類的該方法. 其的參數類型必須與message的 payload屬性一致。
5 //方法執行完畢后,其返回值為 在容器中定義的channel名稱。channel名稱必須存在
6 @Router
7 public String resolveDrinkChannel(Drink drink) {
8 return (drink.isIced()) ? "coldDrinks" : "hotDrinks"; //方法中,是根據處理飲料是否是冷飲,送不同的channel處理
9 }
10 }
2 public class DrinkRouter {
3
4 //@Router表示,接收消息后,調用這個類的該方法. 其的參數類型必須與message的 payload屬性一致。
5 //方法執行完畢后,其返回值為 在容器中定義的channel名稱。channel名稱必須存在
6 @Router
7 public String resolveDrinkChannel(Drink drink) {
8 return (drink.isIced()) ? "coldDrinks" : "hotDrinks"; //方法中,是根據處理飲料是否是冷飲,送不同的channel處理
9 }
10 }
備注:@Router可以把消息路由到多個channel,實現方式如下
@Router
public MessageChannel route(Message message) {
}
@Router
public List<MessageChannel> route(Message message) {
}
@Router
public String route(Foo payload) {
}
@Router
public List<String> route(Foo payload) {
}
@Router
public MessageChannel route(Message message) {

@Router
public List<MessageChannel> route(Message message) {

@Router
public String route(Foo payload) {

@Router
public List<String> route(Foo payload) {

接下來,我們就要找 MessageEndpoint 標記為處理 "coldDrinks" 和 "hotDrinks" 的類,我們發現
這個兩個類并不是通過元數據@MessageEndpoint來實現的,而是通過容器配置
(下面會演示如何用元數據配置,但元數據配置有局限性。這兩種配置方式看大家喜好,系統中都是可以使用)
下面是容器配置信息:
<!-- 消息處理終端 接收 channel coldDrinks的消息后,執行barista.prepareColdDrink方法 生產冷飲 -->
<endpoint input-channel="coldDrinks" handler-ref="barista"
handler-method="prepareColdDrink"/>
<!-- 消息處理終端 接收 channel hotDrinks的消息后,執行barista.prepareHotDrink方法 生產熱飲 -->
<endpoint input-channel="hotDrinks" handler-ref="barista"
handler-method="prepareHotDrink"/>
<endpoint input-channel="coldDrinks" handler-ref="barista"
handler-method="prepareColdDrink"/>
<!-- 消息處理終端 接收 channel hotDrinks的消息后,執行barista.prepareHotDrink方法 生產熱飲 -->
<endpoint input-channel="hotDrinks" handler-ref="barista"
handler-method="prepareHotDrink"/>
我們來看一下源代碼:
1 @Component //這個必須要有,表示是一個消息處理組件
2 public class Barista {
3
4 private long hotDrinkDelay = 1000;
5
6 private long coldDrinkDelay = 700;
7
8 private AtomicInteger hotDrinkCounter = new AtomicInteger();
9
10 private AtomicInteger coldDrinkCounter = new AtomicInteger();
11
12
13 public void setHotDrinkDelay(long hotDrinkDelay) {
14 this.hotDrinkDelay = hotDrinkDelay;
15 }
16
17 public void setColdDrinkDelay(long coldDrinkDelay) {
18 this.coldDrinkDelay = coldDrinkDelay;
19 }
20
21 public void prepareHotDrink(Drink drink) {
22 try {
23 Thread.sleep(this.hotDrinkDelay);
24 } catch (InterruptedException e) {
25 Thread.currentThread().interrupt();
26 }
27 System.out.println("prepared hot drink #" + hotDrinkCounter.incrementAndGet() + ": " + drink);
28 }
29
30 public void prepareColdDrink(Drink drink) {
31 try {
32 Thread.sleep(this.coldDrinkDelay);
33 } catch (InterruptedException e) {
34 Thread.currentThread().interrupt();
35 }
36 System.out.println("prepared cold drink #" + coldDrinkCounter.incrementAndGet() + ": " + drink);
37 }
38
39 }
2 public class Barista {
3
4 private long hotDrinkDelay = 1000;
5
6 private long coldDrinkDelay = 700;
7
8 private AtomicInteger hotDrinkCounter = new AtomicInteger();
9
10 private AtomicInteger coldDrinkCounter = new AtomicInteger();
11
12
13 public void setHotDrinkDelay(long hotDrinkDelay) {
14 this.hotDrinkDelay = hotDrinkDelay;
15 }
16
17 public void setColdDrinkDelay(long coldDrinkDelay) {
18 this.coldDrinkDelay = coldDrinkDelay;
19 }
20
21 public void prepareHotDrink(Drink drink) {
22 try {
23 Thread.sleep(this.hotDrinkDelay);
24 } catch (InterruptedException e) {
25 Thread.currentThread().interrupt();
26 }
27 System.out.println("prepared hot drink #" + hotDrinkCounter.incrementAndGet() + ": " + drink);
28 }
29
30 public void prepareColdDrink(Drink drink) {
31 try {
32 Thread.sleep(this.coldDrinkDelay);
33 } catch (InterruptedException e) {
34 Thread.currentThread().interrupt();
35 }
36 System.out.println("prepared cold drink #" + coldDrinkCounter.incrementAndGet() + ": " + drink);
37 }
38
39 }
如果要用元數據標識實現上述方法:
要用元數據配置,它不像容器配置,可以在一個類中,支持多個不同的Handler方法。以處理prepareColdDrink方法為例
1 @MessageEndpoint(input="coldDrinks") //加了該元數據,它會自動掃描,并作為@Componet標記處理
2 public class Barista {
3
4 private long hotDrinkDelay = 1000;
5
6 private long coldDrinkDelay = 700;
7
8 private AtomicInteger hotDrinkCounter = new AtomicInteger();
9
10 private AtomicInteger coldDrinkCounter = new AtomicInteger();
11
12
13 public void setHotDrinkDelay(long hotDrinkDelay) {
14 this.hotDrinkDelay = hotDrinkDelay;
15 }
16
17 public void setColdDrinkDelay(long coldDrinkDelay) {
18 this.coldDrinkDelay = coldDrinkDelay;
19 }
20
21 public void prepareHotDrink(Drink drink) {
22 try {
23 Thread.sleep(this.hotDrinkDelay);
24 } catch (InterruptedException e) {
25 Thread.currentThread().interrupt();
26 }
27 System.out.println("prepared hot drink #" + hotDrinkCounter.incrementAndGet() + ": " + drink);
28 }
29
30 @Handler//回調處理的方法
31 public void prepareColdDrink(Drink drink) {
32 try {
33 Thread.sleep(this.coldDrinkDelay);
34 } catch (InterruptedException e) {
35 Thread.currentThread().interrupt();
36 }
37 System.out.println("prepared cold drink #" + coldDrinkCounter.incrementAndGet() + ": " + drink);
38 }
39 }
2 public class Barista {
3
4 private long hotDrinkDelay = 1000;
5
6 private long coldDrinkDelay = 700;
7
8 private AtomicInteger hotDrinkCounter = new AtomicInteger();
9
10 private AtomicInteger coldDrinkCounter = new AtomicInteger();
11
12
13 public void setHotDrinkDelay(long hotDrinkDelay) {
14 this.hotDrinkDelay = hotDrinkDelay;
15 }
16
17 public void setColdDrinkDelay(long coldDrinkDelay) {
18 this.coldDrinkDelay = coldDrinkDelay;
19 }
20
21 public void prepareHotDrink(Drink drink) {
22 try {
23 Thread.sleep(this.hotDrinkDelay);
24 } catch (InterruptedException e) {
25 Thread.currentThread().interrupt();
26 }
27 System.out.println("prepared hot drink #" + hotDrinkCounter.incrementAndGet() + ": " + drink);
28 }
29
30 @Handler//回調處理的方法
31 public void prepareColdDrink(Drink drink) {
32 try {
33 Thread.sleep(this.coldDrinkDelay);
34 } catch (InterruptedException e) {
35 Thread.currentThread().interrupt();
36 }
37 System.out.println("prepared cold drink #" + coldDrinkCounter.incrementAndGet() + ": " + drink);
38 }
39 }
這樣整個流程就執行完了,最終我們的飲料產品就按照訂單生產出來了。累了吧,喝咖啡提神著呢!!!
初充:
下面是針對 Spring Integration adapter擴展的學習筆記
JMS Adapters
jms adapters 目前有兩種實現
JmsPollingSourceAdapter 和 JmsMessageDrivenSourceAdapter. 前者是使用Srping的JmsTemplate模板類通過輪循的方式接收消息
后者是使用則通過代理Spring的DefaultMessageListenerContainer實例,實現消息驅動的方式。
xml配置如下:
JmsPollingSourceAdapter
<bean class="org.springframework.integration.adapter.jms.JmsPollingSourceAdapter">
<constructor-arg ref="jmsTemplate"/>
<property name="channel" ref="exampleChannel"/>
<property name="period" value="5000"/> <!-- 輪循時間間隔 -->
<property name="messageMapper" ref=""/> <!-- message轉換 -->
</bean>
<!-- 備注:消息的轉換方式如下:
收到JMS Message消息后,SourceAdapter會調用Spring的MessageConverter實現類,把javax.jms.Message對象
轉換成普通Java對象,再調用Spring Integration的MessageMapper把該對象轉成 org.springframework.integration.message.Message對象 -->
JmsMessageDrivenSourceAdapter
<bean class="org.springframework.integration.adapter.jms.JmsMessageDrivenSourceAdapter">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="exampleQueue"/>
<property name="channel" ref="exampleChannel"/>
<property name="messageConverter" ref=""/> <!-- jms消息對象轉換 -->
<property name="messageMapper" ref="" /> <!-- 普通java對象轉換成 Spring Integration Message -->
<property name="sessionAcknowledgeMode" value="1" />
<!-- sesssion回復模式 AUTO_ACKNOWLEDGE=1 CLIENT_ACKNOWLEDGE=2 DUPS_OK_ACKNOWLEDGE=3 SESSION_TRASACTED=0-->
</bean>
另外還有一個比較有用的類JmsTargetAdapter 它實現了MessageHandler接口。它提把Spring Integration Message對象轉換成
JMS消息并發送到指定的消息隊列。與JMS服務連接的實現可以通過設定 jmsTemplate屬性引用或是connectionFactory和destination
或destinationName屬性。
<bean class="org.springframework.integration.adapter.jms.JmsTargetAdapter">
<constructor-arg ref="connectionFactory"/>
<constructor-arg value="example.queue"/>
<!--或是以下配置
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="exampleQueue"/>
或是
<constructor-arg ref="jmsTemplate"/> -->
</bean>
Good Luck!
Yours Matthew!