Duffblog

          前進一步,看看,需要前進更大一步才可以。

            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            5 隨筆 :: 53 文章 :: 5 評論 :: 0 Trackbacks
          ?在Spring 2.0之前,Spring的JMS的作用局限于產生消息。這個功能(封裝在 JmsTemplate 類中)當然是很好的, 但是,它沒有描述完整的JMS堆棧,比如像消息的 異步 產生和消耗。JMS堆棧缺少的這一部分已經被添加,Spring 2.0現在提供對消息異步消耗的完整支持。
          ????? 讓我們從一個例子開始。
          ????? 首先我們打開ActiveMQ。從ActiveMQ的安裝路徑上的bin目錄,那里有一個ActiveMQ.bat,雙擊執行即可。不過要注意必須先設置java_home環境變量。ActiveMQ默認的服務端口是61616。
          ????? 然后我們開始配置Spring配置文件。我起名為spring-jms.xml
          1. 首先要配置一個ConnectionFactory代碼如下

          ?<bean id="connectionFactory"
          ??????? class="org.apache.activemq.ActiveMQConnectionFactory">
          ??????? <property name="brokerURL" value="tcp://localhost:61616" />
          </bean>

        1. 這里用到的ConnectionFactory是ActiveMQ提供的工廠,為了能使用這個工廠,我們必須在項目中添加以下幾個jar文件:
          geronimo-jms_1.1_spec-1.0.jar,
          activeio-core-3.0-beta3.jar,
          activemq-core-4.0.1.jar,
          backport-util-concurrent-2.1.jar,
          commons-logging-1.0.4.jar,
          geronimo-j2ee-management_1.0_spec-1.0.jar
          以上這些Jar文件都存在于ActiveMQ安裝目錄的lib目錄下,這些可是我一個一個試驗出來的,累個半死。。

        2. 然后應該配置一個Queue(我使用的是點對點方式),不過ActiveMQ只要提供一個名字就可以自動創建隊列,因此這一步省了,呵呵
        3. 下面就輪到Spring的支持類了,首先是JmsTemplate。這個類提供了大量的方法簡化我們對JMS的操作。常用的有兩個, org.springframework.jms.core.JmsTemplate102和 org.springframework.jms.core.JmsTemplate,這兩個類分別支持JMS的1.02版本和1.1版本?,F在比較常用的還是1.02版本。配置如下

          <bean id="jmsTemplate"
          ??????? class="org.springframework.jms.core.JmsTemplate102">
          ??????? <property name="connectionFactory" ref="connectionFactory"/>
          ??????? <property name="timeToLive" value="86400000"/>
          ??????? <property name="defaultDestinationName" value="cmpp" />
          ??????? <property name="messageConverter" ref="messageConverter" />
          ??????? <property name="receiveTimeout" value="30000" />
          ??? </bean>
          上面的配置中用到了第一步配置的connectionFactory以及一個消息轉換的類 messageConverter,這個類實現了 org.springframework.jms.support.converter.MessageConverter接口,可以在消息發送之前和接受之后進行消息類型轉換。具體的看最后的實例代碼。配置代碼如下:
          ?<!-- Spring JMS SimpleConverter -->
          ?????? <bean id="simpleConverter"? class="org.springframework.jms.support.converter.SimpleMessageConverter" />
          <!-- Message Converter -->
          ??? <bean id="messageConverter"
          ??????? class="com.liangj.apmgt.jms.ApmgtMessageConverter">
          ??????? <property name="converter">
          ??????????? <ref local="simpleConverter" />
          ??????? </property>
          ??? </bean>

        4. 這里還配置了發送的消息的存在時間timeToLive,目標Queue的名字defaultDestinationName,接受消息超時時間receiveTimeout

          4。配置發送代碼
          ??? <bean id="producer"
          ??????? class="com.liangj.apmgt.jms.DefaultApmgtMessageProducer">
          ??????? <property name="jmsTemplate" ref="jmsTemplate" />
          ??? </bean>
          5。接著配置監聽器,這是Spring2.0新增的功能,配置如下:
          <!-- this is the Message Driven POJO (MDP) -->
          ??? <bean id="messageListener"
          ??????? class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
          ??????? <constructor-arg>
          ??????????? <bean
          ??????????????? class="com.liangj.apmgt.jms.DefaultApmgtMessageListener" />
          ??????? </constructor-arg>
          ??????? <property name="defaultListenerMethod" value="onMessage" />
          ??????? <property name="messageConverter" ref="messageConverter" />
          ??? </bean>

          ??? <!-- and this is the attendant message listener container -->
          ??? <bean id="listenerContainer"
          ??????? class="org.springframework.jms.listener.DefaultMessageListenerContainer">
          ??????? <property name="connectionFactory" ref="connectionFactory" />
          ??????? <property name="destinationName" value="${jms.destinationName.cmpp}" />
          ??????? <property name="messageSelector" value="${jms.messageSelector}" />
          ??????? <property name="messageListener" ref="messageListener" />
          ??? </bean>

          ?Spring配置監聽器有很多種選擇,在這里我選擇這回種MessageListenerAdapter方法主要是因為這個方法比較靈活。實現他只要一個很普通的java類即可,和JMS以及Spring的耦合度最低。其中方法onMessage可以隨便修改方法名,只要在配置文件中對應的修改就好了。
          ???? MessageListenerAdapter還有一個功能就是如果處理方法(我這里是onMessage)返回一個非空值,它將自動返回一個響應消息。這個消息會返回給JMS Reply-To屬性定義的目的地(如果存在),或者是MessageListenerAdapter設置(如果配置了)的缺省目的地;如果沒有定義目的地,那么將產生一個InvalidDestinationException異常(此異常將不會只被捕獲而不處理,它沿著調用堆棧上傳)。
          ???? 這樣我們的配置就都完成了。接下來我們來實現對應的Java文件
          先是接口文件發送消息接口IApmgtMessageProducer.java
          public interface IApmgtMessageProducer {
          ??? public abstract void sendMessage(ApmgtMessageData messageData);
          }
          接受消息接口IApmgtMessageListener.java
          public interface IApmgtMessageListener {
          ??? public void onMessage(ApmgtMessageData message);
          }
          發消息的文件DefaultApmgtMessageProducer.java
          ?public class DefaultApmgtMessageProducer implements IApmgtMessageProducer {
          ???
          ??? private JmsTemplate jmsTemplate;

          ??? public void setJmsTemplate(JmsTemplate jmsTemplate) {
          ??????? this.jmsTemplate = jmsTemplate;
          ??? }

          ??? public void sendMessage(ApmgtMessageData messageData) {
          ??????? this.jmsTemplate.convertAndSend(messageData);
          ??? }
          }
          收消息文件DefaultApmgtMessageListener.java
          public class DefaultApmgtMessageListener implements IApmgtMessageListener {
          ??? public void onMessage(ApmgtMessageData message) {
          ??????? System.out.println("監聽到消息:"+message);
          ??? }
          }
          消息轉換類ApmgtMessageConverter.java
          public class ApmgtMessageConverter implements MessageConverter {

          ??? private Log log = LogFactory.getLog(ApmgtMessageConverter.class);

          ??? private SimpleMessageConverter converter;

          ??? public void setConverter(SimpleMessageConverter converter) {
          ??????? this.converter = converter;
          ??? }

          ??? public Object fromMessage(Message message) throws JMSException, MessageConversionException {
          ??????? if (message instanceof ObjectMessage) {
          ??????????? ObjectMessage o_message = (ObjectMessage)message;
          ??????????? MessageHeader header = new MessageHeader();
          ??????????? header.setId(message.getLongProperty("id"));
          ??????????? header.setReceiver(message.getIntProperty("receiver"));
          ??????????? header.setSender(message.getIntProperty("sender"));
          ??????????? header.setSendPerson(message.getStringProperty("sendPerson"));
          ??????????? header.setType(message.getIntProperty("type"));
          ??????????? Serializable messageContent = o_message.getObject();
          ??????????? ApmgtMessageData<Serializable> messageData = new ApmgtMessageData<Serializable>();
          ??????????? messageData.setMessageContent(messageContent);
          ??????????? messageData.setMessageHeader(header);
          ??????????? return messageData;
          ??????? }
          ??????? return null;
          ??? }

          ??? public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
          ??????? if (object instanceof ApmgtMessageData) {
          ??????????? ApmgtMessageData data = (ApmgtMessageData) object;
          ??????????? Message message = converter.toMessage(data.getMessageContent(), session);
          ??????????? message.setLongProperty("id", data.getMessageHeader().getId());
          ??????????? message.setIntProperty("receiver", data.getMessageHeader().getReceiver());
          ??????????? message.setIntProperty("sender", data.getMessageHeader().getSender());
          ??????????? message.setIntProperty("type", data.getMessageHeader().getType());
          ??????????? message.setStringProperty("sendPerson", data.getMessageHeader().getSendPerson());
          ??????????? log.info("發送消息[MessageSender]:\n" + message);
          ???????????? return message;
          ??????? } else {
          ??????????? return null;
          ??????? }
          ??? }

          }
          消息類文件? 消息父類:ApmgtMessageData.java
          public class ApmgtMessageData<T extends Serializable>{

          ??? protected T messageContent;
          ???
          ??? protected MessageHeader messageHeader;

          ??? public T getMessageContent() {
          ??????? return this.messageContent;
          ??? }

          ??? public MessageHeader getMessageHeader() {
          ??????? return this.messageHeader;
          ??? }

          ??? public void setMessageContent(T messageContent) {
          ??????? this.messageContent = messageContent;
          ??? }
          ??? public void setMessageHeader(MessageHeader messageHeader) {
          ??????? this.messageHeader = messageHeader;
          ??? }

          }
          消息屬性的一個類MessageHeader.java
          public class MessageHeader {

          ??? /**
          ???? * 消息ID
          ???? */
          ??? private long id;

          ??? /**
          ???? * 消息類型
          ???? */
          ??? private int type;

          ??? /**
          ???? * 消息發送方,發送消息的模塊
          ???? */
          ??? private int sender;

          ??? /**
          ???? * 消息接收方,接收消息的模塊
          ???? */
          ??? private int receiver;

          ??? /**
          ???? * 消息發送者,具體的用戶
          ???? */
          ??? private String sendPerson;

          ??? public MessageHeader(){
          ??????? this.id = System.currentTimeMillis() ;
          ??? }
          ???
          ??? public long getId() {
          ??????? return id;
          ??? }

          ??? public void setId(long id) {
          ??????? this.id = id;
          ??? }

          ??? public String getSendPerson() {
          ??????? return sendPerson;
          ??? }

          ??? public void setSendPerson(String sendPerson) {
          ??????? this.sendPerson = sendPerson;
          ??? }

          ??? public int getReceiver() {
          ??????? return receiver;
          ??? }

          ??? public void setReceiver(int receiver) {
          ??????? this.receiver = receiver;
          ??? }

          ??? public int getSender() {
          ??????? return sender;
          ??? }

          ??? public void setSender(int sender) {
          ??????? this.sender = sender;
          ??? }

          ??? public int getType() {
          ??????? return type;
          ??? }

          ??? public void setType(int type) {
          ??????? this.type = type;
          ??? }

          }

          消息體的類ModPasswordRequest.java
          public class ModPasswordRequest implements Serializable{


          ??? private static final long serialVersionUID = 1L;

          ??? /**
          ???? * 舊密碼
          ???? */
          ??? private String oldPassword;
          ???
          ??? /**
          ???? * 新密碼
          ???? */
          ??? private String newPassword;

          ??? public String getNewPassword() {
          ??????? return newPassword;
          ??? }

          ??? public void setNewPassword(String newPassword) {
          ??????? this.newPassword = newPassword;
          ??? }

          ??? public String getOldPassword() {
          ??????? return oldPassword;
          ??? }

          ??? public void setOldPassword(String oldPassword) {
          ??????? this.oldPassword = oldPassword;
          ??? }

          }

          消息類:ApmgtModPasswordRequest.java
          public class ApmgtModPasswordRequest extends ApmgtMessageData<ModPasswordRequest> {
          ???
          ???
          ???
          ??? private static final int REQ_MODPASSWORD = 0;
          ??? private static final int INTF = 1;
          ??? private static final int APMGT = 2;

          ??? public void init(){
          ??????? messageHeader = new MessageHeader();
          ??????? messageContent = new ModPasswordRequest();
          ??????? messageHeader.setType(REQ_MODPASSWORD);
          ??????? messageHeader.setSender(INTF);
          ??????? messageHeader.setReceiver(APMGT);
          ??????? messageContent.setNewPassword("123456");
          ??????? messageContent.setOldPassword("654321");
          ??? }
          ??
          }
          最后是測試類Main.java

          ??? public class Main {

          ??? public static void main(final String[] args) throws Exception {
          ???????
          ??????? PropertyConfigurator.configure("log4j.properties");
          ???????
          ??????? AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(new String[] { "spring-jms.xml" });
          //??????? ctx.registerShutdownHook();
          ???????
          ??????? IApmgtMessageProducer producer = (IApmgtMessageProducer)ctx.getBean("producer");
          ???????
          ??????? ApmgtModPasswordRequest messageData = new ApmgtModPasswordRequest();
          ??????? messageData.setMessageHeader(new MessageHeader());
          ??????? messageData.setMessageContent(new ModPasswordRequest());
          ??????? messageData.init();
          ???????
          ??????? producer.sendMessage(messageData);
          ??? }
          }
          還有兩個配置文件,第一個spring-jms.xml
          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="[url]http://www.springframework.org/schema/beans"[/url]
          ??? xmlns:xsi="[url]http://www.w3.org/2001/XMLSchema-instance"[/url]
          ??? xsi:schemaLocation="[url]http://www.springframework.org/schema/beans[/url] [url]http://www.springframework.org/schema/beans/spring-beans.xsd">[/url]


          ??? <bean id="propertyConfigurer"
          ??????? class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
          ??????? <property name="locations">
          ??????????? <list>
          ??????????????? <value>apmgt.properties</value>
          ??????????? </list>
          ??????? </property>
          ??? </bean>

          ??? <!-- ####################################### -->
          ??? <!--????????? JMS Spring Beans?????????????? -->
          ??? <!-- ####################################### -->

          ??? <!-- Jms ConnectionFactory -->
          ??? <bean id="connectionFactory"
          ??????? class="org.apache.activemq.ActiveMQConnectionFactory">
          ??????? <property name="brokerURL" value="${jms.brokerURL}" />
          ??? </bean>

          ??? <!-- Spring JMS SimpleConverter -->
          ??? <bean id="simpleConverter"
          ??????? class="org.springframework.jms.support.converter.SimpleMessageConverter" />

          ??? <!-- JMS Queue Template -->
          ??? <bean id="jmsTemplate"
          ??????? class="org.springframework.jms.core.JmsTemplate102">
          ??????? <property name="connectionFactory" ref="connectionFactory"/>
          ??????? <property name="timeToLive" value="${jms.timeToLive}"/>
          ??????? <property name="defaultDestinationName" value="${jms.destinationName.cmpp}" />
          ??????? <property name="messageConverter" ref="messageConverter" />
          ??????? <property name="receiveTimeout" value="${jms.receiveTimeout}" />
          ??? </bean>

          ??? <!-- Message Converter -->
          ??? <bean id="messageConverter"
          ??????? class="com.liangj.apmgt.jms.ApmgtMessageConverter">
          ??????? <property name="converter">
          ??????????? <ref local="simpleConverter" />
          ??????? </property>
          ??? </bean>

          ??? <!-- Message porducer -->
          ??? <bean id="producer"
          ??????? class="com.liangj.apmgt.jms.DefaultApmgtMessageProducer">
          ??????? <property name="jmsTemplate" ref="jmsTemplate" />
          ??? </bean>

          ??? <!-- this is the Message Driven POJO (MDP) -->
          ??? <bean id="messageListener"
          ??????? class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
          ??????? <constructor-arg>
          ??????????? <bean
          ??????????????? class="com.liangj.apmgt.jms.DefaultApmgtMessageListener" />
          ??????? </constructor-arg>
          ??????? <property name="defaultListenerMethod" value="onMessage" />
          ??????? <property name="messageConverter" ref="messageConverter" />
          ??? </bean>

          ??? <!-- and this is the attendant message listener container -->
          ??? <bean id="listenerContainer"
          ??????? class="org.springframework.jms.listener.DefaultMessageListenerContainer">
          ??????? <property name="connectionFactory" ref="connectionFactory" />
          ??????? <property name="destinationName" value="${jms.destinationName.cmpp}" />
          ??????? <property name="messageSelector" value="${jms.messageSelector}" />
          ??????? <property name="messageListener" ref="messageListener" />
          ??? </bean>
          </beans>

          apmgt.properties

          #jms properties
          jms.brokerURL=tcp://localhost:61616
          jms.receiveTimeout=3000
          jms.destinationName.cmpp=cmpp
          jms.messageSelector=receiver=2
          #one day is 86400000 ms. 0 is means that it lives forever.
          jms.timeToLive=86400000

          轉:http://blog.iecn.net/blog/html/do-showone-tid-1035.html

        5. posted on 2006-12-28 22:12 追球者 閱讀(3146) 評論(1)  編輯  收藏 所屬分類: 開源技術

          評論

          # re: 結合Spring2.0和ActiveMQ進行異步消息調用(轉)[未登錄] 2010-07-23 15:29 leven
          有個問題不明白,就是是否不用啟動監聽容器?那么接收端不用啟動就能獲取到消息了?  回復  更多評論
            


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 邵东县| 泰兴市| 渝北区| 稷山县| 西盟| 怀化市| 清丰县| 花莲县| 兴安县| 永吉县| 晋宁县| 介休市| 乐山市| 沿河| 贡觉县| 巴彦淖尔市| 紫阳县| 伽师县| 汤原县| 伊春市| 烟台市| 高州市| 玉林市| 永安市| 闸北区| 毕节市| 秦皇岛市| 娱乐| 西畴县| 建始县| 临城县| 沙湾县| 泌阳县| 繁峙县| 嘉峪关市| 雷波县| 合水县| 黑山县| 韶关市| 福泉市| 邻水|