下面是本人爭取第一時整理的學(xué)習(xí)筆記(針對最新版本)
先來看一下實際例子:
The Cafe Sample(小賣部訂餐例子)
小賣部有一個訂飲料服務(wù),客戶可以通過訂單來訂購所需要飲料。小賣部提供兩種咖啡飲料
LATTE(拿鐵咖啡)和MOCHA(摩卡咖啡)。每種又都分冷飲和熱飲
整個流程如下:
1.有一個下訂單模塊,用戶可以按要求下一個或多個訂單。
2.有一個訂單處理模塊,處理訂單中那些是關(guān)于訂購飲料的。
3.有一個飲料訂購處理模塊,處理拆分訂購的具體是那些種類的飲料,把具體需要生產(chǎn)的飲料要求發(fā)給生產(chǎn)模塊
4.有一個生產(chǎn)模塊,進(jìn)行生產(chǎn)。
5.等生成完成后,有一個訂單確認(rèn)模塊(Waiter),把訂單的生成的飲料輸出。

這個例子利用Spring Integration實現(xiàn)了靈活的,可配置化的模式集成了上述這些服務(wù)模塊。
Spring Integration提供兩種模式的工作方式(Annotation和XML)
先來看一下XML方式,進(jìn)行示例的開發(fā):
配置文件如下:
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd">
<!-- 首先來配置一個GateWay組件,提供消息的發(fā)送和接收。接口Cafe,提供一個void placeOrder(Order order);方法
該方法標(biāo)記了@Gateway(requestChannel="orders"), 實現(xiàn)向orders隊列實現(xiàn)數(shù)據(jù)的發(fā)送
-->
<gateway id="cafe" service-interface="org.springframework.integration.samples.cafe.Cafe"/>
<!-- 訂單Channel -->
<channel id="orders"/>
<!-- 實現(xiàn)Splitter模式, 接收 orders隊列的消息,調(diào)用orderSplitter Bean的split方法,進(jìn)行消息的分解
并把分解后的消息,發(fā)送到drinks隊列.
-->
<splitter input-channel="orders" ref="orderSplitter" method="split" output-channel="drinks"/>
<!-- 飲料訂單Channel,處理飲料的類別 -->
<channel id="drinks"/>
<!-- 實現(xiàn)Router模式,接收 drinks隊列的消息, 并觸發(fā) drinkRouter Bean的 resolveOrderItemChannel方法
由在 resolveOrderItemChannel該方法的返回值(String--隊列名稱)表示把消息路由到那個隊列上
-->
<router input-channel="drinks" ref="drinkRouter" method="resolveOrderItemChannel"/>
<!-- 冷飲生產(chǎn)Channel 最大待處理的數(shù)據(jù)量為 10-->
<channel id="coldDrinks">
<queue capacity="10"/>
</channel>
<!-- 定義一個服務(wù)處理器,其作用是定義一個消息接收隊列 codeDrinks,一但收到消息,則
觸發(fā) barista Bean的 prepareColdDrink方法, 再把 prepareColdDrink方法的值,封成Message的
payLoad屬性,把消息再發(fā)送到preparedDrinks隊列, -->
<service-activator input-channel="coldDrinks" ref="barista"
method="prepareColdDrink" output-channel="preparedDrinks"/>
<!-- 熱飲生產(chǎn)Channel 最大待處理的數(shù)據(jù)量為 10-->
<channel id="hotDrinks">
<queue capacity="10"/>
</channel>
<!-- 定義一個服務(wù)處理器,其作用是定義一個消息接收隊列 hotDrinks,一但收到消息,則
觸發(fā) barista Bean的 prepareHotDrink 再把 prepareColdDrink方法的值,封成Message的
payLoad屬性,把消息再發(fā)送到preparedDrinks隊列, -->
<service-activator input-channel="hotDrinks" ref="barista"
method="prepareHotDrink" output-channel="preparedDrinks"/>
<!-- 定義最終進(jìn)行生產(chǎn)的消息隊列 -->
<channel id="preparedDrinks"/>
<!-- 實現(xiàn) aggregator 模式, 接收 preparedDrinks 消息, 并觸發(fā) waiter Bean的prepareDelivery方法
再把處理好的數(shù)據(jù),發(fā)送到 deliveries隊列 -->
<aggregator input-channel="preparedDrinks" ref="waiter"
method="prepareDelivery" output-channel="deliveries"/>
<!-- 定義一個 stream 適配器,接收 deliveries隊列的消息后,直接輸出到屏幕-->
<stream:stdout-channel-adapter id="deliveries"/>
<beans:bean id="orderSplitter"
class="org.springframework.integration.samples.cafe.xml.OrderSplitter"/>
<beans:bean id="drinkRouter"
class="org.springframework.integration.samples.cafe.xml.DrinkRouter"/>
<beans:bean id="barista" class="org.springframework.integration.samples.cafe.xml.Barista"/>
<beans:bean id="waiter" class="org.springframework.integration.samples.cafe.xml.Waiter"/>
</beans:beans>
我們來看一下整體服務(wù)是怎么啟動的
首先我們來看一下CafeDemo這個類,它觸發(fā)下定單操作
org.springframework.integration.samples.cafe.xml.CafeDemo
2
3 public static void main(String[] args) {
4 ////加載Spring 配置文件 "cafeDemo.xml"
5 AbstractApplicationContext context = null;
6 if(args.length > 0) {
7 context = new FileSystemXmlApplicationContext(args);
8 }
9 else {
10 context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class);
11 }
12 //取得 Cafe實列
13 Cafe cafe = (Cafe) context.getBean("cafe");
14 //準(zhǔn)備 發(fā)送100條消息(訂單)
15 for (int i = 1; i <= 100; i++) {
16 Order order = new Order(i);
17 // 一杯熱飲 參數(shù)說明1.飲料類型 2.數(shù)量 3.是否是冷飲(true表示冷飲)
18 order.addItem(DrinkType.LATTE, 2, false);
19 // 一杯冷飲 參數(shù)說明1.飲料類型 2.數(shù)量 3.是否是冷飲(true表示冷飲)
20 order.addItem(DrinkType.MOCHA, 3, true);
21 //下發(fā)訂單,把消息發(fā)給 orders 隊列
22 cafe.placeOrder(order);
23 }
24 }
25
26 }
下面是Cafe接口的源代碼
//定義GateWay, 把消息發(fā)送到 orders 隊列, Message的payLoad屬性,保存 order參數(shù)值
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
OrderSplitter 源代碼
2
3 //接收 從 orders隊列接收的 order 消息后,調(diào)用 order.getItems方法
4 //進(jìn)行訂單的分解, 返回的List<OrderItem>可會,被拆分為多個消息后(Message.payLoad),發(fā)到指定隊列
5 public List<OrderItem> split(Order order) {
6 return order.getItems();
7 }
8
9 }
10
OrderSplitter.split把消息拆分后,變成多個消息,發(fā)送到drinks隊列.由drinkRouter進(jìn)行消息的接收。
2
3 //從 drinks隊列的消息后,根據(jù)orderItem的屬性,選擇路由到不同的隊列 coldDrinks或hotDrinks
4 public String resolveOrderItemChannel(OrderItem orderItem) {
5 return (orderItem.isIced()) ? "coldDrinks" : "hotDrinks";
6 }
7
8 }
下面看一下,如果是一杯冷飲,則消息發(fā)送到 coldDrinks隊列
接收根據(jù)配置,由barista Bean的prepareColdDrink方法接收消息后,進(jìn)行處理
如果是一杯熱飲,則消息發(fā)送到 hotDrinks隊列
接收根據(jù)配置,由barista Bean的prepareHotDrink方法接收消息后,進(jìn)行處理
2
3 private long hotDrinkDelay = 5000;
4
5 private long coldDrinkDelay = 1000;
6
7 private AtomicInteger hotDrinkCounter = new AtomicInteger();
8
9 private AtomicInteger coldDrinkCounter = new AtomicInteger();
10
11
12 public void setHotDrinkDelay(long hotDrinkDelay) {
13 this.hotDrinkDelay = hotDrinkDelay;
14 }
15
16 public void setColdDrinkDelay(long coldDrinkDelay) {
17 this.coldDrinkDelay = coldDrinkDelay;
18 }
19
20 //處理熱飲訂單,并生成Drink冷料
21 public Drink prepareHotDrink(OrderItem orderItem) {
22 try {
23 Thread.sleep(this.hotDrinkDelay);
24 System.out.println(Thread.currentThread().getName()
25 + " prepared hot drink #" + hotDrinkCounter.incrementAndGet() + " for order #"
26 + orderItem.getOrder().getNumber() + ": " + orderItem);
27 return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
28 orderItem.getShots());
29 } catch (InterruptedException e) {
30 Thread.currentThread().interrupt();
31 return null;
32 }
33 }
34
35 //處理冷飲訂單,并生成Drink冷料
36 public Drink prepareColdDrink(OrderItem orderItem) {
37 try {
38 Thread.sleep(this.coldDrinkDelay);
39 System.out.println(Thread.currentThread().getName()
40 + " prepared cold drink #" + coldDrinkCounter.incrementAndGet() + " for order #"
41 + orderItem.getOrder().getNumber() + ": " + orderItem);
42 return new Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
43 orderItem.getShots());
44 } catch (InterruptedException e) {
45 Thread.currentThread().interrupt();
46 return null;
47 }
48 }
49
50 }
接下來,已經(jīng)把訂單需要生產(chǎn)的飲料已經(jīng)完成,現(xiàn)在可以交給服務(wù)員(waier)交給客人了。
這里使用的aggregate模式,讓服務(wù)器等待這個訂單的所有飲料生產(chǎn)完后的,交給客戶.
下面來介紹該應(yīng)用
一般aggregator的參照 splitter一起使用。Spring Integration會根據(jù)接收到的消息中的消息頭CORRELATION_ID 來判斷,如果有相同的CORRELATION_ID發(fā)現(xiàn),則認(rèn)為它們需要合成一組,并返回(如果沒有自定義合組接口)。
當(dāng)然Spring Integration也提供一個用戶自定的接口來判定消息合組是否滿足要求
boolean isComplete(List<Message<?>> messages);
}
isComplete的方法,收到的messages消息,都是擁用相同消息頭CORRELATION_ID的消息。
-->
<aggregator input-channel="preparedDrinks" ref="waiter"
method="prepareDelivery" output-channel="deliveries"/>
最后,完成訂單的消息會發(fā)到 waiter隊列
2
3 public Delivery prepareDelivery(List<Drink> drinks) {
4 return new Delivery(drinks);
5 }
6
7
8 }
9
10 public class Delivery {
11
12 private static final String SEPARATOR = "-----------------------";
13
14
15 private List<Drink> deliveredDrinks;
16
17 private int orderNumber;
18
19
20 public Delivery(List<Drink> deliveredDrinks) {
21 assert(deliveredDrinks.size() > 0);
22 this.deliveredDrinks = deliveredDrinks;
23 this.orderNumber = deliveredDrinks.get(0).getOrderNumber();
24 }
25
26
27 public int getOrderNumber() {
28 return orderNumber;
29 }
30
31 public List<Drink> getDeliveredDrinks() {
32 return deliveredDrinks;
33 }
34
35 @Override
36 public String toString() {
37 StringBuffer buffer = new StringBuffer(SEPARATOR + "\n");
38 buffer.append("Order #" + getOrderNumber() + "\n");
39 for (Drink drink : getDeliveredDrinks()) {
40 buffer.append(drink);
41 buffer.append("\n");
42 }
43 buffer.append(SEPARATOR + "\n");
44 return buffer.toString();
45 }
46
47 }
最后我們使用一個 stream channel adaptor把訂單生產(chǎn)完成的飲料輸出。
<stream:stdout-channel-adapter id="deliveries"/>
這樣整個流程就執(zhí)行完了,最終我們的飲料產(chǎn)品就按照訂單生產(chǎn)出來了。累了吧,喝咖啡提神著呢!!!
spring-integration官網(wǎng):http://www.springsource.org/spring-integration
關(guān)于 Annotation的介紹,將在下篇介紹。
附:xml配置介紹
Service Activator 配置
2 <service-activator input-channel="exampleChannel" ref="exampleHandler"/>
3 <!-- 會檢查 someMethod方法,是否有 @ServiceActivato 標(biāo)注 output-channel-->
4 <service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>
5 <service-activator input-channel="exampleChannel" output-channel="replyChannel"
6 ref="somePojo" method="someMethod"/>
<inbound-channel-adapter>
觸發(fā)指定的方法,接收消息隊列配置(觸發(fā)輪循訪問的方式)
2 <poller>
3 <interval-trigger interval="5000"/>
4 </poller>
5 </inbound-channel-adapter>
6
7 <inbound-channel-adapter ref="source2" method="method2" channel="channel2">
8 <poller>
9 <cron-trigger expression="30 * * * * MON-FRI"/>
10 </poller>
11 </channel-adapter>
<outbound-channel-adapter/>
觸發(fā)指定的方法,發(fā)送消息
2
3 <outbound-channel-adapter channel="channel2" ref="target2" method="method2">
4 <poller>
5 <interval-trigger interval="3000"/>
6 </poller>
7 </outbound-channel-adapter>
Router
消息路由方式
2 <property name="payloadTypeChannelMap">
3 <map>
4 <entry key="java.lang.String" value-ref="stringChannel"/>
5 <entry key="java.lang.Integer" value-ref="integerChannel"/>
6 </map>
7 </property>
8 </bean>
Aggregator 消息合并
2
3 <aggregator id="completelyDefinedAggregator" 1
4 input-channel="inputChannel" 2
5 output-channel="outputChannel" 3
6 discard-channel="discardChannel" 4
7 ref="aggregatorBean" 5
8 method="add" 6
9 completion-strategy="completionStrategyBean" 7
10 completion-strategy-method="checkCompleteness" 8
11 timeout="42" 9
12 send-partial-result-on-timeout="true" 10
13 reaper-interval="135" 11
14 tracked-correlation-id-capacity="99" 12
15 send-timeout="86420000" 13 />
16
17 <channel id="outputChannel"/>
18
19 <bean id="aggregatorBean" class="sample.PojoAggregator"/>
20
21 <bean id="completionStrategyBean" class="sample.PojoCompletionStrategy"/>
The id of the aggregator is optional. |
|
The input channel of the aggregator. Required. |
|
The channel where the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves). |
|
The channel where the aggregator will send the messages that
timed out (if |
|
A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Required. |
|
A method defined on the bean referenced by |
|
A reference to a bean that implements the decision algorithm as to whether a given message group is complete. The bean can be an implementation of the CompletionStrategy interface or a POJO. In the latter case the completion-strategy-method attribute must be defined as well. Optional (by default, the aggregator . |
|
A method defined on the bean referenced by
|
|
The timeout for aggregating messages (counted from the arrival of the first message). Optional. |
|
Whether upon the expiration of the timeout, the aggregator shall try to aggregate the already arrived messages. Optional (false by default). |
|
The interval (in milliseconds) at which a reaper task is executed, checking if there are any timed out groups. Optional. |
|
The capacity of the correlation id tracker. Remembers the already processed correlation ids, preventing the formation of new groups for messages that arrive after their group has been already processed (aggregated or discarded). Optional. |
|
The timeout for sending out messages. Optional. |
配置消息合并策略
2

3 public boolean checkCompleteness(List<Long> numbers) {
4 int sum = 0;
5 for (long number: numbers) {
6 sum += number;
7 }
8 return sum >= maxValue;
9 }
10 }
Good Luck!
Yours Matthew!