@import url(http://www.aygfsteel.com/CuteSoft_Client/CuteEditor/Load.ashx?type=style&file=SyntaxHighlighter.css);@import url(/css/cuteeditor.css);
面向服務的架構和事件驅動的架構天生就有著對分布式系統的適應性,這些架構都有著模塊性、松散耦合,和適應性等特性。
“面向服務”這個術語已經演變成一個架構,在那里服務作為一個軟件組件嵌入在企業業務邏輯和特新的核心中,特性如下:
· 松散耦合:服務部與其它組件有著根深蒂固的關系
· 協議獨立:多種協議透明訪問
· 位置不可知:一個服務執行一組業務邏輯,針對這次調用返回一個結果
· 粗粒度:不論在什么位置均可訪問該服務。
· 維護無用戶狀態
一個事件驅動框架(EDA)定義了一個設計和實現一個應用系統得方法學,在這個系統里事件可傳輸于松散耦合的軟件組件和服務之間。一個事件驅動系統典型地由事件消費者和事件產生者組成。事件消費者向事件管理器訂閱事件,事件產生者向事件管理器發布事件。當事件管理器從事件產生者那接收到一個事件時,事件管理把這個事件轉送給相應的事件消費者。如果這個事件消費者是不可用的,事件管理這將保留這個事件,一段間隔之后再次轉送該事件消費者。這種事件傳送方法在基于消息的系統里就是:儲存(store)和轉送(forward)。
構建一個包含事件驅動構架的應用程序和系統,這樣就使得這些應用程序和系統響應更靈敏,因為事件驅動的系統更適合應用在不可預知的和異步的環境里。
事件驅動設計和開發的優勢:
事件驅動設計和開發所提供的優勢如下:
· 可以更容易開發和維護大規模分布式應用程序和不可預知的服務或異步服務
· 可以很容易,低成本地集成、再集成、再配置新的和已存在的英勇程序和服務
· 促進遠程組件和服務的再使用,擁有一個更靈敏、沒有Bug的開發環境
· 短期利益:更容易定制。因為設計對動態處理又更好的響應。
· 長期利益:系統和組織的狀態變得更精準,對實時變化的響應接近于同步
Mule是一個開源消息ESB框架,一個消息代理,一個分級事件驅動的框架(SEDA)。SEDA定義了一個依照分級隊列、高度并行的企業級平臺。Mule使用SED的概念增加事件處理的性能。
Mule事件對象
Mule事件對象包含事件數據和被組件所感知和操控的屬性。屬性是任意的,在事件創建之后任何時間可被設置。
org.mule.umo.UMOEvent類代表了一個在Mule環境中出現的時間。所有在組件之間發送或接收的數據都是org.mule.umo.UMOEvent的一個實體。可以訪問一個原始的或被轉換的Mule事件對象中的數據能。一個Mule事件對象使用一個與提供者管理的提供者轉換數據,提供者收到數據后把事件中的有效載荷轉換成當前組件所識別的格式。
一個Mule事件對象的有效載荷能通過org.mule.umo.UMOMessage接口訪問,一個org.mule.umo.UMOMessage實例由有效載荷和它的屬性組成。這個接口是不同技術實現的消息對象的一個抽象。
org.mule.extras.client.MuleClient類定義了一個簡單的借口,允許Mule客戶端從Mule服務器接收和發送事件數據。在大多數Mule應用程序里,事件是被一些外部的并發行為所觸發,例如一個主題上接收到消息或在目錄里一個文件被刪除。
Mule支持同步、異步和請求響應事件,事件處理和傳輸實用不同的技術例如JMS,HTTP,電子郵件和基于XML的RPC。Mule能很容易地嵌入到任何應用框架中,明確支持Spring框架。Mule也支持動態的,預定義的,基于內容的和基于規則的消息路由。Mule使得預定義的和計劃性的事務更容易,包括XA事務支持。Mule提供一個有代表性的狀態調用(REST)API提供給與Web的事件訪問。
Mule ESB模式驅動系統中所有服務,這個系統有著一個分離的消息通訊中樞。服務注冊在總線上,但不知道其他任何被注冊的消息;因此,每個服務只關心處理它收到的事件。Mule也把容器,傳輸,轉換細節從服務中分離出來,允許任何對象作為服務注冊到總線的。
下面演示了如何去發送一個同步事件到另外的Mule組件:
MuleClient類需要一個服務器URL區定義它所連接的遠程Mule服務器的終端。URL定義了傳輸協議、接收消息的地址,提供者在派遣一個事件時可以隨時使用這些信息。終端例示如下:
· vm://com.jeffhanson.receivers.Default: 使用虛擬機的提供者派遣到一個com.jeffhanson.receivers.Default
· jms://jmsProvider/accounts.topic:使用全局注冊的jmsProvider派遣一個JMS消息到ccounts.topic.
· jms://accounts.topic: 使用第一個(默認)的JMS提供者派遣JMS消息
Mule事件處理
Mule可以在三種不同的方式發送和接收事件:
1.異步方式:一個組件可通過不同的線程同時處理多個事件的發送和接收
2.同步方式:在一個組件重新工作之前,一個單一的事件必須被處理完。換言之,一個創建了事件的組件發送事件時將被阻斷,直到發送任務完成,因此,一次只允許處理一個事件
3.請求-應答方式:一個組件專門請求一個事件,然后等待一個特定的時間去接收回應。
org.mule.impl.MuleComponent實現類提供了一個具體的組件類,它包括又有創建,發送和接收事件的功能。
執行同步動作的對象應該實現org.mule.umo.lifecycle.Callable接口,這個定義了一個簡單的方法Object onCall(UMOEventContext eventContext)。Callable接口提供支持事件調用的UMO對象。雖然不是強制的,但這個接口提供了一個生命周期控制的方法,當實現這個接口的組建接收到一個消息時執行這個方法。下面展示了這個接口的簡單實現。
從onCall()方法可返回任何對象。當組件的UMOLifecycleAdapter接收這個對象時,它首先看看這個對象是否是一個UMOMessage;如果這個對象既不是UMOMessage也不是Null,那么以這個對象作為有效載荷去創建一個新的消息。這個新事件經由所配制的出站路有器發布,如果UMO對象已經配制了一個出站路由器,那么在UMOEventContext實例中不能調用setStopFurtherProcessing(true)方法
Mule使用的一個簡單的事件框架
讓我們把這幾段Mule的代碼放到一起去構建一個簡單的事件框架。這個框架包含一個負責注冊和注銷事件的管理器,可以接收事件,和負責路由同步和異步消息到他們相應的服務。
Mule的虛擬機協議要求有一個放置事件管理器工作目錄META-INF/services/org/mule/providers/vm路徑下的可配制文件,配制文件為協議定義了大量的組件,例如連接器和調度工廠。配制文件的內容如下:
一個簡單的接口定義了事件管理器的公有結構:
事件管理器類是被封裝在一個工廠類里,因此,可以依據需要去改變它的實現而不會影響到它的客戶端。事件管理器實現如下:
Mule框架依據消息有效載荷的類型來派遣消息。事件框架使用基于有效載荷的派遣機制,這種派遣機制把注冊到事件管理器中一般定義的事件方法作為事件接收器。下面的類定義了一個包含三個重載的receiveEvent()方法的服務:
事件管理器客戶端應用程序發送三個事件到測試服務中,去測試每一個receiveEvent()方法。客戶端應用程序如下:
Mule平臺簡化和抽象了前面所敘述框架的事件方面的處理,使得你發送和接收穿越一個層級結構的同步和異步消息時,不需要知道下層系統的細節。工廠模式和SOA準則的應用,則使得這個框架有了一個松散耦合和可擴展的設計。
總結
當服務和進程需要穿越多層結構,使用多種協議去交互時,設計一個有效地事件驅動的軟件系統可能變得復雜了。可是,一個使用標準模式包含適當事件管理層的面向服務架構能減少,甚至消除這些問題。
Mule 平臺提供API,組件和抽象對象,這些都可以用于去建立一個強大,健壯,事件驅動的有著良好的伸縮性和可維護性的系統。
@import url(http://www.aygfsteel.com/CuteSoft_Client/CuteEditor/Load.ashx?type=style&file=SyntaxHighlighter.css);@import url(/css/cuteeditor.css);
“面向服務”這個術語已經演變成一個架構,在那里服務作為一個軟件組件嵌入在企業業務邏輯和特新的核心中,特性如下:
· 松散耦合:服務部與其它組件有著根深蒂固的關系
· 協議獨立:多種協議透明訪問
· 位置不可知:一個服務執行一組業務邏輯,針對這次調用返回一個結果
· 粗粒度:不論在什么位置均可訪問該服務。
· 維護無用戶狀態
一個事件驅動框架(EDA)定義了一個設計和實現一個應用系統得方法學,在這個系統里事件可傳輸于松散耦合的軟件組件和服務之間。一個事件驅動系統典型地由事件消費者和事件產生者組成。事件消費者向事件管理器訂閱事件,事件產生者向事件管理器發布事件。當事件管理器從事件產生者那接收到一個事件時,事件管理把這個事件轉送給相應的事件消費者。如果這個事件消費者是不可用的,事件管理這將保留這個事件,一段間隔之后再次轉送該事件消費者。這種事件傳送方法在基于消息的系統里就是:儲存(store)和轉送(forward)。
構建一個包含事件驅動構架的應用程序和系統,這樣就使得這些應用程序和系統響應更靈敏,因為事件驅動的系統更適合應用在不可預知的和異步的環境里。
事件驅動設計和開發的優勢:
事件驅動設計和開發所提供的優勢如下:
· 可以更容易開發和維護大規模分布式應用程序和不可預知的服務或異步服務
· 可以很容易,低成本地集成、再集成、再配置新的和已存在的英勇程序和服務
· 促進遠程組件和服務的再使用,擁有一個更靈敏、沒有Bug的開發環境
· 短期利益:更容易定制。因為設計對動態處理又更好的響應。
· 長期利益:系統和組織的狀態變得更精準,對實時變化的響應接近于同步
Mule是一個開源消息ESB框架,一個消息代理,一個分級事件驅動的框架(SEDA)。SEDA定義了一個依照分級隊列、高度并行的企業級平臺。Mule使用SED的概念增加事件處理的性能。
Mule事件對象
Mule事件對象包含事件數據和被組件所感知和操控的屬性。屬性是任意的,在事件創建之后任何時間可被設置。
org.mule.umo.UMOEvent類代表了一個在Mule環境中出現的時間。所有在組件之間發送或接收的數據都是org.mule.umo.UMOEvent的一個實體。可以訪問一個原始的或被轉換的Mule事件對象中的數據能。一個Mule事件對象使用一個與提供者管理的提供者轉換數據,提供者收到數據后把事件中的有效載荷轉換成當前組件所識別的格式。
一個Mule事件對象的有效載荷能通過org.mule.umo.UMOMessage接口訪問,一個org.mule.umo.UMOMessage實例由有效載荷和它的屬性組成。這個接口是不同技術實現的消息對象的一個抽象。
org.mule.extras.client.MuleClient類定義了一個簡單的借口,允許Mule客戶端從Mule服務器接收和發送事件數據。在大多數Mule應用程序里,事件是被一些外部的并發行為所觸發,例如一個主題上接收到消息或在目錄里一個文件被刪除。
Mule支持同步、異步和請求響應事件,事件處理和傳輸實用不同的技術例如JMS,HTTP,電子郵件和基于XML的RPC。Mule能很容易地嵌入到任何應用框架中,明確支持Spring框架。Mule也支持動態的,預定義的,基于內容的和基于規則的消息路由。Mule使得預定義的和計劃性的事務更容易,包括XA事務支持。Mule提供一個有代表性的狀態調用(REST)API提供給與Web的事件訪問。
Mule ESB模式驅動系統中所有服務,這個系統有著一個分離的消息通訊中樞。服務注冊在總線上,但不知道其他任何被注冊的消息;因此,每個服務只關心處理它收到的事件。Mule也把容器,傳輸,轉換細節從服務中分離出來,允許任何對象作為服務注冊到總線的。
下面演示了如何去發送一個同步事件到另外的Mule組件:
String componentName = "MyReceiver"; // The name of the receiving component.
String transformers = null; // A comma-separated list of transformers
// to apply to the result message.
String payload = "A test event"; // The payload of the event.
java.util.Map messageProperties = null; // Any properties to be associated
// with the payload.
MuleClient client = new MuleClient();
UMOMessage message = client.sendDirect(componentName,
transformers,
payload,
messageProperties);
System.out.println("Event result: " + message.getPayloadAsString())
String transformers = null; // A comma-separated list of transformers
// to apply to the result message.
String payload = "A test event"; // The payload of the event.
java.util.Map messageProperties = null; // Any properties to be associated
// with the payload.
MuleClient client = new MuleClient();
UMOMessage message = client.sendDirect(componentName,
transformers,
payload,
messageProperties);
System.out.println("Event result: " + message.getPayloadAsString())
MuleClient類需要一個服務器URL區定義它所連接的遠程Mule服務器的終端。URL定義了傳輸協議、接收消息的地址,提供者在派遣一個事件時可以隨時使用這些信息。終端例示如下:
· vm://com.jeffhanson.receivers.Default: 使用虛擬機的提供者派遣到一個com.jeffhanson.receivers.Default
· jms://jmsProvider/accounts.topic:使用全局注冊的jmsProvider派遣一個JMS消息到ccounts.topic.
· jms://accounts.topic: 使用第一個(默認)的JMS提供者派遣JMS消息
Mule事件處理
Mule可以在三種不同的方式發送和接收事件:
1.異步方式:一個組件可通過不同的線程同時處理多個事件的發送和接收
2.同步方式:在一個組件重新工作之前,一個單一的事件必須被處理完。換言之,一個創建了事件的組件發送事件時將被阻斷,直到發送任務完成,因此,一次只允許處理一個事件
3.請求-應答方式:一個組件專門請求一個事件,然后等待一個特定的時間去接收回應。
org.mule.impl.MuleComponent實現類提供了一個具體的組件類,它包括又有創建,發送和接收事件的功能。
執行同步動作的對象應該實現org.mule.umo.lifecycle.Callable接口,這個定義了一個簡單的方法Object onCall(UMOEventContext eventContext)。Callable接口提供支持事件調用的UMO對象。雖然不是強制的,但這個接口提供了一個生命周期控制的方法,當實現這個接口的組建接收到一個消息時執行這個方法。下面展示了這個接口的簡單實現。
import org.mule.umo.lifecycle.Callable;
public class EchoComponent
implements Callable
{
public Object onCall(UMOEventContext context) throws Exception
{
String msg = context.getMessageAsString();
// Print message to System.out
System.out.println("Received synchronous message: " + msg);
// Echo transformed message back to sender
return context.getTransformedMessage();
}
}
public class EchoComponent
implements Callable
{
public Object onCall(UMOEventContext context) throws Exception
{
String msg = context.getMessageAsString();
// Print message to System.out
System.out.println("Received synchronous message: " + msg);
// Echo transformed message back to sender
return context.getTransformedMessage();
}
}
從onCall()方法可返回任何對象。當組件的UMOLifecycleAdapter接收這個對象時,它首先看看這個對象是否是一個UMOMessage;如果這個對象既不是UMOMessage也不是Null,那么以這個對象作為有效載荷去創建一個新的消息。這個新事件經由所配制的出站路有器發布,如果UMO對象已經配制了一個出站路由器,那么在UMOEventContext實例中不能調用setStopFurtherProcessing(true)方法
Mule使用的一個簡單的事件框架
讓我們把這幾段Mule的代碼放到一起去構建一個簡單的事件框架。這個框架包含一個負責注冊和注銷事件的管理器,可以接收事件,和負責路由同步和異步消息到他們相應的服務。
Mule的虛擬機協議要求有一個放置事件管理器工作目錄META-INF/services/org/mule/providers/vm路徑下的可配制文件,配制文件為協議定義了大量的組件,例如連接器和調度工廠。配制文件的內容如下:
connector=org.mule.providers.vm.VMConnector
dispatcher.factory=org.mule.providers.vm.VMMessageDispatcherFactory
message.receiver=org.mule.providers.vm.VMMessageReceiver
message.adapter=org.mule.providers.vm.VMMessageAdapter
endpoint.builder=org.mule.impl.endpoint.ResourceNameEndpointBuilder
dispatcher.factory=org.mule.providers.vm.VMMessageDispatcherFactory
message.receiver=org.mule.providers.vm.VMMessageReceiver
message.adapter=org.mule.providers.vm.VMMessageAdapter
endpoint.builder=org.mule.impl.endpoint.ResourceNameEndpointBuilder
一個簡單的接口定義了事件管理器的公有結構:
package com.jeffhanson.mule;
import org.mule.umo.FutureMessageResult;
public interface EventManager{
/**
* Sends an event message synchronously to a given service.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message.
* @return Object The result, if any.
* @throws EventException on error
*/
public Object sendSynchronousEvent(String serviceName, Object payload) throws EventException;
/**
* Sends an event message asynchronously to a given service.
*
* @param serviceName The name of the service to which the event message is to be sent.
* @param payload The content of the event message.
* @return FutureMessageResult The result, if any.
* @throws EventException on error
*/
public FutureMessageResult sendAsynchronousEvent(String serviceName, Object payload) throws EventException;
/**
* Starts this event manager.
*/
public void start();
/**
* Stops this event manager.
*/
public void stop();
/**
* Retrieves the protocol this event manager uses.
* @return
*/
public String getProtocol();
/**
* Registers a service to receive event messages.
*
* @param serviceName The name to associate with the service.
* @param implementation Either a container reference to the service or a fully-qualified class name.
*/
public void registerService(String serviceName, String implementation) throws EventException;
/**
* Unregisters a service from receiving event messages.
*
* @param serviceName The name associated with the service to unregister.
*/
public void unregisterService(String serviceName) throws EventException;
}
import org.mule.umo.FutureMessageResult;
public interface EventManager{
/**
* Sends an event message synchronously to a given service.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message.
* @return Object The result, if any.
* @throws EventException on error
*/
public Object sendSynchronousEvent(String serviceName, Object payload) throws EventException;
/**
* Sends an event message asynchronously to a given service.
*
* @param serviceName The name of the service to which the event message is to be sent.
* @param payload The content of the event message.
* @return FutureMessageResult The result, if any.
* @throws EventException on error
*/
public FutureMessageResult sendAsynchronousEvent(String serviceName, Object payload) throws EventException;
/**
* Starts this event manager.
*/
public void start();
/**
* Stops this event manager.
*/
public void stop();
/**
* Retrieves the protocol this event manager uses.
* @return
*/
public String getProtocol();
/**
* Registers a service to receive event messages.
*
* @param serviceName The name to associate with the service.
* @param implementation Either a container reference to the service or a fully-qualified class name.
*/
public void registerService(String serviceName, String implementation) throws EventException;
/**
* Unregisters a service from receiving event messages.
*
* @param serviceName The name associated with the service to unregister.
*/
public void unregisterService(String serviceName) throws EventException;
}
事件管理器類是被封裝在一個工廠類里,因此,可以依據需要去改變它的實現而不會影響到它的客戶端。事件管理器實現如下:
package com.jeffhanson.mule;
import org.mule.umo.*;
import org.mule.extras.client.MuleClient;
import org.mule.impl.endpoint.MuleEndpoint;
import org.mule.config.QuickConfigurationBuilder;
import java.util.HashMap;
import java.util.Map;
public class EventManagerFactory{
private static HashMap instances = new HashMap();
/**
* Retrieves the event manager instance for a given protocol.
*
* @param protocol The protocol to use.
* @return EventManager The event manager instance.
*/
public static EventManager getInstance(String protocol){
EventManager instance = (EventManager)instances.get(protocol);
if (instance == null){
instance = new EventManagerImpl(protocol);
instances.put(protocol, instance);
}
return instance;
}
/**
* A concrete implementation for a simple event manager.
*/
private static class EventManagerImpl implements EventManager {
private UMOManager manager = null;
private QuickConfigurationBuilder builder = null;
private MuleClient eventClient = null;
private String protocol = null;
private MuleEndpoint receiveEndpoint = null;
private MuleEndpoint sendEndpoint = null;
private EventManagerImpl(String protocol){
this.protocol = protocol;
}
/**
* Starts this event manager.
*/
public void start() {
try {
builder = new QuickConfigurationBuilder();
manager = builder.createStartedManager(true, protocol + "tmp/events");
eventClient = new MuleClient();
receiveEndpoint = new MuleEndpoint(protocol + "tmp/events/receive");
sendEndpoint = new MuleEndpoint(protocol + "tmp/events/send");
}
catch (UMOException e) {
System.err.println(e);
}
}
/**
* Stops this event manager.
*/
public void stop() {
try {
manager.stop();
}
catch (UMOException e) {
System.err.println(e);
}
}
/**
* Retrieves the protocol this event manager uses.
* @return
*/
public String getProtocol() {
return protocol;
}
/**
* Registers a service to receive event messages.
*
* @param serviceName The name to associate with the service.
* @param implementation Either a container reference to the service
* or a fully-qualified class name
* to use as the component implementation.
*/
public void registerService(String serviceName, String implementation) throws EventException {
if (!manager.getModel().isComponentRegistered(serviceName)) {
try {
builder.registerComponent(implementation, serviceName, receiveEndpoint, sendEndpoint);
} catch (UMOException e) {
throw new EventException(e.toString());
}
}
}
/**
* Unregisters a service from receiving event messages.
*
* @param serviceName The name associated with the service to unregister.
*/
public void unregisterService(String serviceName) throws EventException {
try {
builder.unregisterComponent(serviceName);
}
catch (UMOException e) {
throw new EventException(e.toString());
}
}
/**
* Sends an event message synchronously to a given service.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message
* @return Object The result, if any.
* @throws EventException on error
*/
public Object sendSynchronousEvent(String serviceName, Object payload) throws EventException {
try {
if (!manager.getModel().isComponentRegistered(serviceName)){
throw new EventException("Service: " + serviceName + " is not registered.");
}
String transformers = null;
Map messageProperties = null;
UMOMessage result = eventClient.sendDirect(serviceName, transformers, payload, messageProperties);
if (result == null) {
return null;
}
return result.getPayload();
} catch (UMOException e) {
throw new EventException(e.toString());
} catch (Exception e) {
throw new EventException(e.toString());
}
}
/**
* Sends an event message asynchronously.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message.
* @return FutureMessageResult The result, if any
* @throws EventException on error
*/
public FutureMessageResult sendAsynchronousEvent(String serviceName, Object payload) throws EventException {
FutureMessageResult result = null;
try {
if (!manager.getModel().isComponentRegistered(serviceName)) {
throw new EventException("Service: " + serviceName + " is not registered.");
}
String transformers = null;
Map messageProperties = null;
result = eventClient.sendDirectAsync(serviceName, transformers, payload, messageProperties);
} catch (UMOException e) {
throw new EventException(e.toString());
}
return result;
}
}
}
import org.mule.umo.*;
import org.mule.extras.client.MuleClient;
import org.mule.impl.endpoint.MuleEndpoint;
import org.mule.config.QuickConfigurationBuilder;
import java.util.HashMap;
import java.util.Map;
public class EventManagerFactory{
private static HashMap instances = new HashMap();
/**
* Retrieves the event manager instance for a given protocol.
*
* @param protocol The protocol to use.
* @return EventManager The event manager instance.
*/
public static EventManager getInstance(String protocol){
EventManager instance = (EventManager)instances.get(protocol);
if (instance == null){
instance = new EventManagerImpl(protocol);
instances.put(protocol, instance);
}
return instance;
}
/**
* A concrete implementation for a simple event manager.
*/
private static class EventManagerImpl implements EventManager {
private UMOManager manager = null;
private QuickConfigurationBuilder builder = null;
private MuleClient eventClient = null;
private String protocol = null;
private MuleEndpoint receiveEndpoint = null;
private MuleEndpoint sendEndpoint = null;
private EventManagerImpl(String protocol){
this.protocol = protocol;
}
/**
* Starts this event manager.
*/
public void start() {
try {
builder = new QuickConfigurationBuilder();
manager = builder.createStartedManager(true, protocol + "tmp/events");
eventClient = new MuleClient();
receiveEndpoint = new MuleEndpoint(protocol + "tmp/events/receive");
sendEndpoint = new MuleEndpoint(protocol + "tmp/events/send");
}
catch (UMOException e) {
System.err.println(e);
}
}
/**
* Stops this event manager.
*/
public void stop() {
try {
manager.stop();
}
catch (UMOException e) {
System.err.println(e);
}
}
/**
* Retrieves the protocol this event manager uses.
* @return
*/
public String getProtocol() {
return protocol;
}
/**
* Registers a service to receive event messages.
*
* @param serviceName The name to associate with the service.
* @param implementation Either a container reference to the service
* or a fully-qualified class name
* to use as the component implementation.
*/
public void registerService(String serviceName, String implementation) throws EventException {
if (!manager.getModel().isComponentRegistered(serviceName)) {
try {
builder.registerComponent(implementation, serviceName, receiveEndpoint, sendEndpoint);
} catch (UMOException e) {
throw new EventException(e.toString());
}
}
}
/**
* Unregisters a service from receiving event messages.
*
* @param serviceName The name associated with the service to unregister.
*/
public void unregisterService(String serviceName) throws EventException {
try {
builder.unregisterComponent(serviceName);
}
catch (UMOException e) {
throw new EventException(e.toString());
}
}
/**
* Sends an event message synchronously to a given service.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message
* @return Object The result, if any.
* @throws EventException on error
*/
public Object sendSynchronousEvent(String serviceName, Object payload) throws EventException {
try {
if (!manager.getModel().isComponentRegistered(serviceName)){
throw new EventException("Service: " + serviceName + " is not registered.");
}
String transformers = null;
Map messageProperties = null;
UMOMessage result = eventClient.sendDirect(serviceName, transformers, payload, messageProperties);
if (result == null) {
return null;
}
return result.getPayload();
} catch (UMOException e) {
throw new EventException(e.toString());
} catch (Exception e) {
throw new EventException(e.toString());
}
}
/**
* Sends an event message asynchronously.
*
* @param serviceName The name of the service to which the event
* message is to be sent.
* @param payload The content of the event message.
* @return FutureMessageResult The result, if any
* @throws EventException on error
*/
public FutureMessageResult sendAsynchronousEvent(String serviceName, Object payload) throws EventException {
FutureMessageResult result = null;
try {
if (!manager.getModel().isComponentRegistered(serviceName)) {
throw new EventException("Service: " + serviceName + " is not registered.");
}
String transformers = null;
Map messageProperties = null;
result = eventClient.sendDirectAsync(serviceName, transformers, payload, messageProperties);
} catch (UMOException e) {
throw new EventException(e.toString());
}
return result;
}
}
}
Mule框架依據消息有效載荷的類型來派遣消息。事件框架使用基于有效載荷的派遣機制,這種派遣機制把注冊到事件管理器中一般定義的事件方法作為事件接收器。下面的類定義了一個包含三個重載的receiveEvent()方法的服務:
package com.jeffhanson.mule;
import java.util.Date;
public class TestService{
public void receiveEvent(String eventMessage){
System.out.println("
TestService.receiveEvent(String) received " + "event message: " + eventMessage + "");
}
public void receiveEvent(Integer eventMessage){
System.out.println("
TestService.receiveEvent(Integer) received " +"event message: " + eventMessage + "");
}
public void receiveEvent(Date eventMessage){
System.out.println("
TestService.receiveEvent(Date) received " + "event message: " + eventMessage + "");
}
}
import java.util.Date;
public class TestService{
public void receiveEvent(String eventMessage){
System.out.println("
TestService.receiveEvent(String) received " + "event message: " + eventMessage + "");
}
public void receiveEvent(Integer eventMessage){
System.out.println("
TestService.receiveEvent(Integer) received " +"event message: " + eventMessage + "");
}
public void receiveEvent(Date eventMessage){
System.out.println("
TestService.receiveEvent(Date) received " + "event message: " + eventMessage + "");
}
}
事件管理器客戶端應用程序發送三個事件到測試服務中,去測試每一個receiveEvent()方法。客戶端應用程序如下:
package com.jeffhanson.mule;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.log4j.BasicConfigurator;
import java.util.Date;
public class EventClient{
static Logger logger = Logger.getLogger(EventClient.class);
public static void main(String[] args) {
// Set up a simple configuration that logs on the console.
BasicConfigurator.configure();
logger.setLevel(Level.ALL);
try {
EventManager eventManager = EventManagerFactory.getInstance("vm://");
eventManager.start();
String serviceName = TestService.class.getName();
String implementation = serviceName;
eventManager.registerService(serviceName, implementation);
Object result = eventManager.sendSynchronousEvent(serviceName, "A test message");
if (result != null) {
System.out.println("Event result: " + result.toString());
}
result = eventManager.sendSynchronousEvent(serviceName, new Integer(23456));
if (result != null) {
System.out.println("Event result: " + result.toString());
}
result = eventManager.sendSynchronousEvent(serviceName, new Date());
if (result != null) {
System.out.println("Event result: " + result.toString());
}
eventManager.stop();
}
catch (EventException e)
{
System.err.println(e.toString());
}
}
}
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import org.apache.log4j.BasicConfigurator;
import java.util.Date;
public class EventClient{
static Logger logger = Logger.getLogger(EventClient.class);
public static void main(String[] args) {
// Set up a simple configuration that logs on the console.
BasicConfigurator.configure();
logger.setLevel(Level.ALL);
try {
EventManager eventManager = EventManagerFactory.getInstance("vm://");
eventManager.start();
String serviceName = TestService.class.getName();
String implementation = serviceName;
eventManager.registerService(serviceName, implementation);
Object result = eventManager.sendSynchronousEvent(serviceName, "A test message");
if (result != null) {
System.out.println("Event result: " + result.toString());
}
result = eventManager.sendSynchronousEvent(serviceName, new Integer(23456));
if (result != null) {
System.out.println("Event result: " + result.toString());
}
result = eventManager.sendSynchronousEvent(serviceName, new Date());
if (result != null) {
System.out.println("Event result: " + result.toString());
}
eventManager.stop();
}
catch (EventException e)
{
System.err.println(e.toString());
}
}
}
Mule平臺簡化和抽象了前面所敘述框架的事件方面的處理,使得你發送和接收穿越一個層級結構的同步和異步消息時,不需要知道下層系統的細節。工廠模式和SOA準則的應用,則使得這個框架有了一個松散耦合和可擴展的設計。
總結
當服務和進程需要穿越多層結構,使用多種協議去交互時,設計一個有效地事件驅動的軟件系統可能變得復雜了。可是,一個使用標準模式包含適當事件管理層的面向服務架構能減少,甚至消除這些問題。
Mule 平臺提供API,組件和抽象對象,這些都可以用于去建立一個強大,健壯,事件驅動的有著良好的伸縮性和可維護性的系統。
@import url(http://www.aygfsteel.com/CuteSoft_Client/CuteEditor/Load.ashx?type=style&file=SyntaxHighlighter.css);@import url(/css/cuteeditor.css);