1.本來trigger receiver流程的模塊和接收者類是放在一個APP Server上的,但由于性能的考慮,這種schedule模塊的調度和管理可能會影響業務邏輯的執行,占用業務邏輯執行的系統資源,所以將它放到單獨的JVM上運行,作為一個Standalone的java application。這樣schedule模塊就不能直接通過內存調用接收者流程,接收者必須開放遠程rpc服務,讓trigger通過遠程調用的方法主動調用消息接收者去接受消息。
原來的系統通過MessageReceiverFactory獲得一個MessageReceiver 然后主動接收消息。
MessageReceiver:
public interface MessageReceiver { void invokeProcessFlow(String processId) throws ServerReceiverException; } |
ServerMessageReceiver:
public class ServerMessageReceiver implements MessageReceiver { ServerMessageReceiver() { } public void invokeProcessFlow(String processId) throws ServerReceiverException { //receive message } |
MessageReceiverFactory:
public class MessageReceiverFactory { private static MessageReceiver messageReceiver; private static void init() throws ServerReceiverException { messageReceiver = new ServerMessageReceiver(); } public synchronized static MessageReceiver getMessageReceiver() throws ServerReceiverException { if(messageReceiver == null) { init(); } return messageReceiver; } } |
2.增加RMI服務后:
客戶端:
· 客戶端通過MessageReceiverFactory獲得MessageReceiver的遠程版本:RemoteMessageReceiver
· 客戶端使用遠程版本的MessageReceiver look up出RemotableMessageReceiver的RMI Stub進行遠程的RPC調用,通知服務器端接收消息。
public class RemoteMessageReceiver implements MessageReceiver { private RemotableMessageReceiver messageReceiver; private String rmiHost; private int rmiPort; private String serviceName; public RemoteMessageReceiver(String host, int port, String serviceName) throws ServerReceiverException { try{ this.rmiHost = host; this.rmiPort = port; this.serviceName = serviceName; Registry registry = LocateRegistry.getRegistry(rmiHost, rmiPort); messageReceiver = (RemotableMessageReceiver) registry.lookup(this.serviceName); }catch(Exception ex) { throw new ServerReceiverException("look up remote message receiver failed!", this, ex); } } public void invokeProcessFlow(String processId) throws ServerReceiverException { try { this.messageReceiver.invokeProcessFlow(processId); } catch (RemoteException e) { throw new ServerReceiverException(e.getMessage(), this, e); } } } |
這里用了適配器模式,對RemotableMessageReceiver進行適配,從而滿足MessageReceiver接口的標準,方便MessageReceiverFactory的統一生產,并且使客戶端使用的RemoteMessageReceiver和具體使用的通信方法解耦,將來如果不使用RMI, 只需要替換RemotableMessageReceiver就可以了。不會影響客戶端的代碼。
服務器端:
- 服務器端設置開關,可以開啟和關閉遠程服務。
- 如果開啟遠程服務,就需要注冊服務,注冊的服務對象是提供給客戶端用的遠程對象,服務端本身使用的MessageReceiver不需要實現RemotableMessageReceiver接口和原來一樣實現MessageReceiver接口,從而原有的應用并沒有太大影響。
- 服務器端通過MessageReceiverFactory獲得MessageReceiver也可以主動接受消息,如果需要提供遠程服務,就必須注冊遠程服務。
這樣,服務器端首先增加了RemotableMessageReceiver接口,所有遠程對象實現該接口,RMIMessageReceiver提供RMI服務的遠程對象,供客戶端遠程調用。RMIMessageReceiver必須滿足RemotableMessageReceiver接口的契約,所以也使用了適配器模式。增加了RMIMessageReceiver后,服務器端使用的MessageReceiver就和RMIMessageReceiver耦合在一起了,為了解耦,我又新增了一個StdMessageReceiver類,作為服務器端本地的MessageReceiver,供服務器端本地調用,StdMessageReceiver用了裝飾器模式,對已有的MessageReceiver進行修飾,支持服務器端本地調用,如果以后本地調用加了新的功能就不會影響RMIMessageReceiver的功能了,比如凡是本地調用都需要在服務器端記log,這樣就只需要在StdMessageReceiver添加打log的功能,而不會影響RMIMessageReceiver。
MessageReceiverFactory簡單工廠用于生產MessageReceiver對象,可能是客戶端的RemoteMessageReceiver也可能是服務器端的StdMessageReceiver。
為了在deploy時候就啟動RMI服務,我們可以在servlet的init方法中初始化并注冊RMI服務,在deetroy方法中相應的取消RMI服務。
RemotableMessageReceiver: 遠程服務對象接口
public interface RemotableMessageReceiver extends Remote { void invokeProcessFlow(String processId) throws ServerReceiverException, RemoteException; } |
RMIMessageReceiver:提供RMI服務的遠程對象
public class RMIMessageReceiver implements RemotableMessageReceiver { private MessageReceiver messageReceiver; RMIMessageReceiver(MessageReceiver messageReceiver) throws RemoteException { this.messageReceiver = messageReceiver; SystemInfo sysInfo = ContextFactory.getSystemConfigContext().getSystemInfo(); System.out.println("beign to bind at rmiport: " + sysInfo.getRmiport()); RMIUtils.bind(sysInfo.getRmiport(), sysInfo .getReceiverRmiBindName(), this); } public void invokeProcessFlow(String processId) throws ServerReceiverException, RemoteException { this.messageReceiver.invokeProcessFlow(processId); } } |
SystemInfo存儲的是一些配置信息,比如是不是服務器端,是不是需要開啟遠程服務,如果開啟的話相應的host,port和rmi遠程對象綁定的服務名
StdMessageReceiver:服務器端使用的MessageReceiver。
public class StdMessageReceiver implements MessageReceiver { private MessageReceiver messageReceiver; public StdMessageReceiver(MessageReceiver messageReceiver) { this.messageReceiver = messageReceiver; } public void invokeProcessFlow(String processId) throws ServerReceiverException { //do some log this.messageReceiver.invokeProcessFlow(processId); } } |
MessageReceiverFactory: 生產MessageReceiver的簡單工廠:
public class MessageReceiverFactory { private static MessageReceiver messageReceiver; private static RemotableMessageReceiver remotableMessageReceiver; private static void init() throws ServerReceiverException { SystemInfo sysInfo = ContextFactory.getSystemConfigContext() .getSystemInfo(); if (sysInfo.isServer()) { MessageReceiver mifMessageReceiver = new ServerMessageReceiver(); if (sysInfo.isRemoteable()) { try { remotableMessageReceiver = new RMIMessageReceiver(mifMessageReceiver); } catch (RemoteException e) { throw new ServerReceiverException( "bind Message Receiver to port: " + sysInfo.getRmiport() + " with service name: " + sysInfo.getReceiverRmiBindName(), e); } } messageReceiver = new StdMessageReceiver(mifMessageReceiver); } else { String rmiHost = sysInfo.getRmiHost(); int rmiPort = sysInfo.getRmiport(); String serviceName = sysInfo.getReceiverRmiBindName(); messageReceiver = createRemoteMesageReceiver(rmiHost, rmiPort, serviceName); } } public static MessageReceiver createRemoteMesageReceiver(String rmiHost, int rmiPort, String serviceName) throws ServerReceiverException { return new RemoteMessageReceiver(rmiHost, rmiPort, serviceName); } public synchronized static MessageReceiver getMessageReceiver() throws ServerReceiverException { if(messageReceiver == null) { init(); } return messageReceiver; } public static RemotableMessageReceiver getRemoteableMesageReceiver(){ return remotableMessageReceiver; } public synchronized static void initMessageReceiver() throws ServerReceiverException { init(); } } |
StartUpServlet:啟動時(deploy)時parse config并且初始化MessageReceiver
publicclass StartupServlet extends HttpServlet { public void init(ServletConfig config) throws ServletException { try { //parse configuration
//init rmi service MessageReceiverFactory.initMessageReceiver();
} catch (Exception ex) { ex.printStackTrace(); thrownew ServletException(ex.getMessage(), ex); } } publicvoid destroy() { try {
RemotableMessageReceiver remotableMessageReceiver = MessageReceiverFactory .getRemoteableMesageReceiver(); if (remotableMessageReceiver != null) { SystemInfo sysInfo = ContextFactory.getSystemConfigContext() .getSystemInfo(); RMIUtils.unBind(sysInfo.getRmiport(), sysInfo .getReceiverRmiBindName(), remotableMessageReceiver); } } catch (Exception e) { e.printStackTrace(); } } } |
RMIUtils: 工具類提供rmi注冊和撤銷服務的功能:
public class RMIUtils { public static void bind(int rmiPort, String serviceName, Remote remoteObject) throws RemoteException { SystemInfo sysInfo = ContextFactory.getSystemConfigContext() .getSystemInfo(); Remote exportable = (Remote) UnicastRemoteObject .exportObject(remoteObject); Registry registry = null; try { registry = LocateRegistry.getRegistry(sysInfo.getRmiport()); registry.list(); } catch (Exception e) { registry = LocateRegistry.createRegistry(sysInfo.getRmiport()); } registry.list(); System.out.println("bind the service: " + serviceName); String bindName = serviceName; registry.rebind(bindName, exportable); } public static void unBind(int rmiPort, String serviceName, Remote remoteObject) throws RemoteException { SystemInfo sysInfo = ContextFactory.getSystemConfigContext() .getSystemInfo(); Registry registry = LocateRegistry.getRegistry(sysInfo.getRmiport()); String bindName = serviceName; try { registry.unbind(bindName); UnicastRemoteObject.unexportObject(remoteObject, true); } catch (java.rmi.NotBoundException nbe) { } } } |