fun

           

          activeMQ筆記之一

          1.JMS介紹
              JMS源于企業(yè)應(yīng)用對于消息中間件的需求,使應(yīng)用程序可以通過消息進(jìn)行異步處理而互不影響。Sun公司和它的合作伙伴設(shè)計(jì)的JMS API定義了一組公共的應(yīng)用程序接口和相應(yīng)語法,使得Java程序能夠和其他消息組件進(jìn)行通信。JMS有四個組成部分:JMS服務(wù)提供者、消息管理對象、消息的生產(chǎn)者消費(fèi)者和消息本身。
          1)JMS服務(wù)提供者實(shí)現(xiàn)消息隊(duì)列和通知,同時(shí)實(shí)現(xiàn)消息管理的API。JMS已經(jīng)是J2EE API的一部分,J2EE服務(wù)器都提供JMS服務(wù)。
          2) 消息管理對象提供對消息進(jìn)行操作的API。JMS API中有兩個消息管理對象:創(chuàng)建jms連接使用的工廠(ConnectionFactory)和目的地(Destination),根據(jù)消息的消費(fèi)方式的不同ConnectionFactory可以分為QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分為隊(duì)列(Queue)和主題(Topic)兩種。
          3)消息的生產(chǎn)者和消費(fèi)者。消息的產(chǎn)生由JMS的客戶端完成,JMS服務(wù)提供者負(fù)責(zé)管理這些消息,消息的消費(fèi)者可以接收消息。消息的生產(chǎn)者可以分為――點(diǎn)對點(diǎn)消息發(fā)布者(P2P)和主題消息發(fā)布者(TopicPublisher)。所以,消息的消費(fèi)者分為兩類:主題消息的訂閱者(TopicSubscriber)和點(diǎn)對點(diǎn)消息的接收者(queue receiver)
          4)消息。消息是服務(wù)提供者和客戶端之間傳遞信息所使用的信息單元。JMS消息由以下三部分組成:
            消息頭(header)――JMS消息頭包含了許多字段,它們是消息發(fā)送后由JMS提供者或消息發(fā)送者產(chǎn)生,用來表示消息、設(shè)置優(yōu)先權(quán)和失效時(shí)間等等,并且為消息確定路由。
            屬性(property)――用來添加刪除消息頭以外的附加信息。
            消息體(body)――JMS中定義了5種消息體:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

          2.Messages 通信方式
          上面提到JMS通信方式分為點(diǎn)對點(diǎn)通信和發(fā)布/訂閱方式
          1)點(diǎn)對點(diǎn)方式(point-to-point)
             點(diǎn)對點(diǎn)的消息發(fā)送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder 發(fā)送消息,receive接收消息.具體點(diǎn)就是Sender Client發(fā)送Message Queue ,而 receiver Cliernt從Queue中接收消息和"發(fā)送消息已接受"到Quere,確認(rèn)消息接收。消息發(fā)送客戶端與接收客戶端沒有時(shí)間上的依賴,發(fā)送客戶端可以在任何時(shí)刻發(fā)送信息到Queue,而不需要知道接收客戶端是不是在運(yùn)行
          2)發(fā)布/訂閱 方式(publish/subscriber Messaging)
              發(fā)布/訂閱方式用于多接收客戶端的方式.作為發(fā)布訂閱的方式,可能存在多個接收客戶端,并且接收端客戶端與發(fā)送客戶端存在時(shí)間上的依賴。一個接收端只能接收他創(chuàng)建以后發(fā)送客戶端發(fā)送的信息。作為subscriber ,在接收消息時(shí)有兩種方法,destination的receive方法,和實(shí)現(xiàn)message listener 接口的onMessage 方法。

          3.為什么選用ActiveMQ
             1)ActiveMQ是一個開放源碼
             2)基于Apache 2.0 licenced 發(fā)布并實(shí)現(xiàn)了JMS 1.1。
             3)ActiveMQ現(xiàn)在已經(jīng)和作為很多項(xiàng)目的異步消息通信核心了
             4)在很多中小型項(xiàng)目中采用ActiveMQ+SPRING+TOMCAT開發(fā)模式。

          4.編程模式
          4.1消息產(chǎn)生者向JMS發(fā)送消息的步驟
          (1)創(chuàng)建連接使用的工廠類JMS ConnectionFactory
          (2)使用管理對象JMS ConnectionFactory建立連接Connection
          (3)使用連接Connection 建立會話Session
          (4)使用會話Session和管理對象Destination創(chuàng)建消息生產(chǎn)者M(jìn)essageSender
          (5)使用消息生產(chǎn)者M(jìn)essageSender發(fā)送消息
          4.2消息消費(fèi)者從JMS接受消息的步驟
          (1)創(chuàng)建連接使用的工廠類JMS ConnectionFactory
          (2)使用管理對象JMS ConnectionFactory建立連接Connection
          (3)使用連接Connection 建立會話Session
          (4)使用會話Session和管理對象Destination創(chuàng)建消息消費(fèi)者M(jìn)essageReceiver
          (5)使用消息消費(fèi)者M(jìn)essageReceiver接受消息,需要用setMessageListener將MessageListener接口綁定到MessageReceiver
          消息消費(fèi)者必須實(shí)現(xiàn)了MessageListener接口,需要定義onMessage事件方法。

          5.ActiveMQ運(yùn)行
          ActiveMQ5.0版本默認(rèn)啟動時(shí),啟動了內(nèi)置的jetty服務(wù)器,提供一個demo應(yīng)用和用于監(jiān)控ActiveMQ的admin應(yīng)用。運(yùn)行%activemq_home%bin/目錄下的 activemq.bat , 之后你會看見如下一段話表示啟動成功。
          打開http://www.bt285.cn /admin/queues.jsp ,可以查看相應(yīng)的queue中是否有消息

          6.SendMessage(用于發(fā)送消息)

          import javax.jms.Connection;   
          import javax.jms.Destination;   
          import javax.jms.JMSException;   
          import javax.jms.MessageProducer;   
          import javax.jms.Session;   
          import javax.jms.TextMessage;   
          import org.apache.activemq.ActiveMQConnectionFactory;   
            
          public class SendMessage {   
           
          private static final String url ="tcp://www.5a520.cn :61616";   
           
          private static final String QUEUE_NAME ="choice.queue";   
           
          protected String expectedBody = "<hello>http://www.guihua.org  中國桂花樹!</hello>";   
           
          public void sendMessage() throws JMSException{   
            Connection connection 
          =null;   
            
          try{   
             ActiveMQConnectionFactory connectionFactory 
          =new ActiveMQConnectionFactory(url);   
             connection 
          = (Connection)connectionFactory.createConnection();   
             connection.start();   
             Session session 
          = (Session)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);   
             Destination destination 
          = session.createQueue(QUEUE_NAME);   
             MessageProducer producer 
          = session.createProducer(destination);   
             TextMessage message 
          = session.createTextMessage(expectedBody);   
             message.setStringProperty(
          "headname""remoteB");   
             producer.send(message);       
            }
          catch(Exception e){   
             e.printStackTrace();   
            }
             
           }
             
              
           
          public static void main(String[] args){   
            SendMessage sndMsg 
          = new SendMessage();   
            
          try{   
             sndMsg.sendMessage();   
            }
          catch(Exception ex){   
             System.out.println(ex.toString());   
            }
             
           }
             
          }
            

          7.ReceiveMessage(用于接收消息)

          import java.io.File;   
          import java.io.FileInputStream;   
          import java.io.FileOutputStream;   
          import java.io.IOException;   
          import javax.jms.BytesMessage;   
          import javax.jms.Connection;   
          import javax.jms.Destination;   
          import javax.jms.JMSException;   
          import javax.jms.Message;   
          import javax.jms.MessageConsumer;   
          import javax.jms.Session;   
          import javax.jms.TextMessage;   
          import org.apache.activemq.ActiveMQConnectionFactory;   
            
          public class ReceiveMessage {   
           
          private static final String url = "tcp:// www.5a520.cn :61616";   
           
          private static final String QUEUE_NAME = "choice.queue";   
           
          public void receiveMessage() {   
            Connection connection 
          = null;   
            
          try {   
             
          try {   
              ActiveMQConnectionFactory connectionFactory 
          = new ActiveMQConnectionFactory(url);   
              connection 
          = connectionFactory.createConnection();   
             }
           catch (Exception e) {   
                          System.out.println(e.toString());   
             }
             
             connection.start();   
             Session session 
          = connection.createSession(false,   
               Session.AUTO_ACKNOWLEDGE);   
             Destination destination 
          = session.createQueue(QUEUE_NAME);   
             MessageConsumer consumer 
          = session.createConsumer(destination);   
             consumeMessagesAndClose(connection, session, consumer);   
            }
           catch (Exception e) {   
              System.out.println(e.toString());   
            }
             
           }
             
            
           
          protected void consumeMessagesAndClose(Connection connection,Session session, MessageConsumer consumer)   
           
          throws JMSException {   
            
          for (int i = 0; i < 1;) {   
             Message message 
          = consumer.receive(1000);   
             
          if (message != null{   
              i
          ++;   
              onMessage(message);   
             }
             
            }
             
            System.out.println(
          "Closing connection");   
            consumer.close();   
            session.close();   
            connection.close();   
           }
             
            
           
          public void onMessage(Message message) {   
            
          try {   
             
          if (message instanceof TextMessage) {   
              TextMessage txtMsg 
          = (TextMessage) message;   
              String msg 
          = txtMsg.getText();   
              System.out.println(
          "Received: " + msg);   
             }
             
            }
           catch (Exception e) {   
             e.printStackTrace();   
            }
             
            
           }
             
            
           
          public static void main(String args[]) {   
            ReceiveMessage rm 
          = new ReceiveMessage();   
            rm.receiveMessage();   
           }
             
            
          }
            


           

          posted on 2009-04-26 17:47 fun 閱讀(2217) 評論(0)  編輯  收藏

          導(dǎo)航

          統(tǒng)計(jì)

          常用鏈接

          留言簿(11)

          隨筆檔案

          友情鏈接

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 抚宁县| 千阳县| 新巴尔虎右旗| 山阳县| 安龙县| 潼关县| 凌海市| 扎赉特旗| 光山县| 长乐市| 新野县| 文山县| 绥阳县| 襄城县| 长泰县| 房产| 洪洞县| 谢通门县| 科尔| 河南省| 滁州市| 临西县| 马尔康县| 江山市| 洛浦县| 锦州市| 新蔡县| 准格尔旗| 怀化市| 化隆| 阿拉善左旗| 葵青区| 安化县| 怀集县| 嘉鱼县| 赤壁市| 涟源市| 涡阳县| 七台河市| 东宁县| 蛟河市|