瘋狂

          STANDING ON THE SHOULDERS OF GIANTS
          posts - 481, comments - 486, trackbacks - 0, articles - 1
            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

          ActiveMQ

          Posted on 2010-01-29 10:14 瘋狂 閱讀(2096) 評論(0)  編輯  收藏 所屬分類: spring

          企業中各項目中相互協作的時候可能用得到消息通知機制。比如有東西更新了,可以通知做索引。

          在 Java 里有 JMS 的多個實現。其中 apache 下的 ActiveMQ 就是不錯的選擇。還有一個比較熱的是 RabbitMQ (是 erlang 語言實現的)。這里示例下使用 ActiveMQ

          用 ActiveMQ 最好還是了解下 JMS

          JMS 公共 點對點域 發布/訂閱域
          ConnectionFactory QueueConnectionFactory TopicConnectionFactory
          Connection QueueConnection TopicConnection
          Destination Queue Topic
          Session QueueSession TopicSession
          MessageProducer QueueSender TopicPublisher
          MessageConsumer QueueReceiver TopicSubscriber

          JMS 定義了兩種方式:Quere(點對點);Topic(發布/訂閱)。

          ConnectionFactory 是連接工廠,負責創建Connection。

          Connection 負責創建 Session。

          Session 創建 MessageProducer(用來發消息) 和 MessageConsumer(用來接收消息)。

          Destination 是消息的目的地。

          詳細的可以網上找些 JMS 規范(有中文版)。

          下載 apache-activemq-5.3.0。http://activemq.apache.org/download.html,解壓,然后雙擊 bin/activemq.bat。運行后,可以在 http://localhost:8161/admin 觀察。也有 demo, http://localhost:8161/demo。把 activemq-all-5.3.0.jar 加入 classpath。

          Jms 發送 代碼:

          1. public static void main(String[] args) throws Exception {   
          2.     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
          3.   
          4.     Connection connection = connectionFactory.createConnection();   
          5.     connection.start();   
          6.   
          7.     Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
          8.     Destination destination = session.createQueue("my-queue");   
          9.   
          10.     MessageProducer producer = session.createProducer(destination);   
          11.     for(int i=0; i<3; i++) {   
          12.         MapMessage message = session.createMapMessage();   
          13.         message.setLong("count"new Date().getTime());   
          14.         Thread.sleep(1000);   
          15.         //通過消息生產者發出消息   
          16.         producer.send(message);   
          17.     }   
          18.     session.commit();   
          19.     session.close();   
          20.     connection.close();   
          21. }  

          Jms 接收代碼:

          1. public static void main(String[] args) throws Exception {   
          2.     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
          3.   
          4.     Connection connection = connectionFactory.createConnection();   
          5.     connection.start();   
          6.   
          7.     final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
          8.     Destination destination = session.createQueue("my-queue");   
          9.   
          10.     MessageConsumer consumer = session.createConsumer(destination);   
          11.     /*//listener 方式  
          12.     consumer.setMessageListener(new MessageListener() {  
          13.  
          14.         public void onMessage(Message msg) {  
          15.             MapMessage message = (MapMessage) msg;  
          16.             //TODO something....  
          17.             System.out.println("收到消息:" + new Date(message.getLong("count")));  
          18.             session.commit();  
          19.         }  
          20.  
          21.     });  
          22.     Thread.sleep(30000);  
          23.     */  
          24.     int i=0;   
          25.     while(i<3) {   
          26.         i++;   
          27.         MapMessage message = (MapMessage) consumer.receive();   
          28.         session.commit();   
          29.   
          30.         //TODO something....   
          31.         System.out.println("收到消息:" + new Date(message.getLong("count")));   
          32.     }   
          33.   
          34.     session.close();   
          35.     connection.close();   
          36. }  

          啟動 JmsReceiver 和 JmsSender 可以在看輸出三條時間信息。當然 Jms 還指定有其它格式的數據,如 TextMessage

          結合 Spring 的 JmsTemplate 方便用:

          xml:

          1. <?xml version="1.0" encoding="UTF-8"?>  
          2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
          3.         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">  
          4.   
          5. <!-- 在非 web / ejb 容器中使用 pool 時,要手動 stop,spring 不會為你執行 destroy-method 的方法   
          6.     <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">  
          7.         <property name="connectionFactory">  
          8.             <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
          9.                 <property name="brokerURL" value="tcp://localhost:61616" />  
          10.             </bean>  
          11.         </property>  
          12.     </bean>  
          13. -->  
          14.     <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
          15.         <property name="brokerURL" value="tcp://localhost:61616" />  
          16.     </bean>  
          17.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
          18.         <property name="connectionFactory" ref="jmsFactory" />  
          19.         <property name="defaultDestination" ref="destination" />  
          20.         <property name="messageConverter">  
          21.             <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />  
          22.         </property>  
          23.     </bean>  
          24.   
          25.     <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">  
          26.         <constructor-arg index="0" value="my-queue" />  
          27.     </bean>  
          28.   
          29. </beans>  

          sender:

          1. public static void main(String[] args) {   
          2.     ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml");   
          3.   
          4.     JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");   
          5.   
          6.     jmsTemplate.send(new MessageCreator() {   
          7.   
          8.         public Message createMessage(Session session) throws JMSException {   
          9.             MapMessage mm = session.createMapMessage();   
          10.             mm.setLong("count"new Date().getTime());   
          11.             return mm;   
          12.         }   
          13.   
          14.     });   
          15. }  

          receiver:

          1. public static void main(String[] args) {   
          2.     ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:app*.xml");   
          3.   
          4.     JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");   
          5.     while(true) {   
          6.         Map<String, Object> mm =  (Map<String, Object>) jmsTemplate.receiveAndConvert();   
          7.         System.out.println("收到消息:" + new Date((Long)mm.get("count")));   
          8.     }   
          9. }  

          注意:直接用 Jms 接口時接收了消息后要提交一下,否則下次啟動接收者時還可以收到舊數據。有了 JmsTemplate 就不用自己提交 session.commit() 了。如果使用了 PooledConnectionFactory 要把 apache-activemq-5.3.0\lib\optional\activemq-pool-5.3.0.jar 加到 classpath

          主站蜘蛛池模板: 手游| 高陵县| 胶州市| 新疆| 泰安市| 洪洞县| 尚志市| 泸水县| 嘉定区| 大荔县| 澜沧| 筠连县| 涟源市| 建瓯市| 白山市| 康保县| 米易县| 婺源县| 高邑县| 江津市| 四平市| 尼木县| 济宁市| 如皋市| 凉城县| 石泉县| 黑龙江省| 江门市| 万全县| 乡城县| 桃园市| 柘城县| 柯坪县| 林西县| 五大连池市| 吉木乃县| 营口市| 怀宁县| 监利县| 东平县| 江油市|