hk2000c技術(shù)專(zhuān)欄

          技術(shù)源于哲學(xué),哲學(xué)來(lái)源于生活 關(guān)心生活,關(guān)注健康,關(guān)心他人

            BlogJava :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            111 隨筆 :: 1 文章 :: 28 評(píng)論 :: 0 Trackbacks

          本篇主要講解在未使用其他框架(Spring)整合情況下,獨(dú)立基于ActiveMQ,使用JMS規(guī)范進(jìn)行消息通信。
              
               一.JMS回顧
                 因?yàn)锳ctiveMQ是一個(gè)JMS Provider的實(shí)現(xiàn),因此在開(kāi)始實(shí)作前,有必要復(fù)習(xí)下JMS的基礎(chǔ)知識(shí)
              Java Message Service (JMS)是sun提出來(lái)的為J2EE提供企業(yè)消息處理的一套規(guī)范,JMS目前有2套規(guī)范還在使用JMS 1.0.2b和1.1. 1.1已經(jīng)成為主流的JMS Provider事實(shí)上的標(biāo)準(zhǔn)了.
                *1.1主要在session上面有一些重要改變,比如支持建立同一session上的transaction,讓他支持同時(shí)發(fā)送P2P(Queue)消息和接受
          Topic消息。
                
                 在JMS中間主要定義了2種消息模式Point-to-Point (點(diǎn)對(duì)點(diǎn)),Publich/Subscribe Model (發(fā)布/訂閱者),
              其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化訂閱)2種消息處理方式。
              
               下面是JMS規(guī)范基本的接口和實(shí)現(xiàn)
               JMS Common Interfacse PTP-Specific Interface   Pub/Sub-specific interfaces
               ConnectionFactory     QueueConnectionFactory   TopicConnectionFactory
               Connection            QueueConnection          TopicConnection
               Destination           Queue                    Topic
               Session               QueueSession             TopiSession
               MessageProducer       QueueSender              TopicPublisher
               MessageConsumer       QueueReceiver/QueueBrwer TopicSubscriber


               二.使用Queue

                   下面以ActiveMQ example的代碼為主進(jìn)行說(shuō)明
                  
                  1. 使用ActiveMQ的Connection,ConnectionFactory 建立連接,注意這里沒(méi)有用到pool
                 

          java 代碼
          1. import org.apache.activemq.ActiveMQConnection   
          2. import org.apache.activemq.ActiveMQConnectionFactory   

                  //建立Connection

          java 代碼
          1. protected Connection createConnection() throws JMSException, Exception {   
          2.      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);   
          3.      Connection connection = connectionFactory.createConnection();   
          4.      if (durable && clientID!=null) {   
          5.          connection.setClientID(clientID);   
          6.      }   
          7.      connection.start();   
          8.      return connection;   
          9.     }  

                  //建立Session
            

          java 代碼
          1. protected Session createSession(Connection connection) throws Exception {   
          2.          Session session = connection.createSession(transacted, ackMode);   
          3.          return session;   
          4.         }   

                  2。發(fā)送消息的代碼
           //建立QueueSession
           

          java 代碼
          1. protected MessageProducer createProducer(Session session) throws JMSException {   
          2.         Destincation destination = session.createQueue("queue.hello");   
          3.         MessageProducer producer = session.createProducer(destination);   
          4.         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
          5.            
          6.         if( timeToLive!=0 )   
          7.             producer.setTimeToLive(timeToLive);   
          8.         return producer;   
          9.         }   

                   //使用Producer發(fā)送消息到Queue
              

          java 代碼
          1. producer.send(message);   

                 
                  3。接受消息,在JMS規(guī)范里面,你可以使用
            

          java 代碼
          1. QueueReceiver/QueueBrowser直接接受消息,但是更多的情況下我們采用消息通知方式,即實(shí)現(xiàn)MessageListener接口   
          2.  public void onMessage(Message message) {   
          3.  //process message   
          4.  }   
          5.           
          6.  //set MessageListner ,receive message   
          7.  Destincation destination = session.createQueue("queue.hello");   
          8.  consumer = session.createConsumer(destination);   
          9.  consumer.setMessageListener(this);   

                 
                  以上就是使用jms queue發(fā)送接受消息的基本方式

           
               三 Topic

                  1. 建立連接
             

          java 代碼
          1. protected Connection createConnection() throws JMSException, Exception {      
          2.         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);      
          3.         Connection connection = connectionFactory.createConnection();      
          4.         //如果你要使用DurableSubScription 方式,你必須為connection設(shè)置一個(gè)ClientID      
          5.         if (durable && clientID!=null) {      
          6.             connection.setClientID(clientID);      
          7.         }      
          8.         connection.start();      
          9.         return connection;      
          10.        }      

                 2. 建立Session

          java 代碼
          1. protected Session createSession(Connection connection) throws Exception {      
          2.         Session session = connection.createSession(transacted, ackMode);      
          3.         return session;      
          4.         }    

           3.創(chuàng)建Producer 發(fā)送消息到Topic   
                 

          java 代碼
          1. //create topic on  session   
          2.        topic = session.createTopic("topic.hello");   
          3.  producer = session.createProducer(topic);   
          4.        //send message    
          5.        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
          6.  producer.send(message);   


           4.創(chuàng)建Consumer接受消息(基本上和Queue相同)

          java 代碼
          1. Destincation destination  = session.createTopic("topic.hello");      
          2. MessageConsumer consumer = session.createConsumer(destination);      
          3. consumer.setMessageListener(this);      
          4.            
          5.      //如果你使用的是Durable Subscription方式,你必須在建立connection的時(shí)候      
          6.      //設(shè)置ClientID,而且建立comsumer的時(shí)候使用createDurableSubscriber方法,為他指定一個(gè)consumerName。      
          7.  //connection.setClientID(clientId);      
          8.  //consumer = session.createDurableSubscriber((Topic) destination, consumerName);   

                 
           四:連接ActiveMQ的方式
                  ActiveMQConnectionFactory 提供了多種連接到Broker的方式activemq.apache.org/uri-protocols.html

           常見(jiàn)的有
           vm://host:port     //vm
           tcp://host:port    //tcp
           ssl://host:port    //SSL
           stomp://host:port  //stomp協(xié)議可以跨語(yǔ)言,目前有很多種stomp client 庫(kù)(java,c#,c/c++,ruby,python...);

          posted on 2007-11-16 17:05 hk2000c 閱讀(8330) 評(píng)論(2)  編輯  收藏 所屬分類(lèi): JMS

          評(píng)論

          # re: ActiveMQ 實(shí)踐之路(二) 使用Queue或者Topic發(fā)送/接受消息 2009-06-03 16:43 藍(lán)雨
          寫(xiě)的太簡(jiǎn)單了, 初學(xué)者肯定看不懂!  回復(fù)  更多評(píng)論
            

          # re: ActiveMQ 實(shí)踐之路(二) 使用Queue或者Topic發(fā)送/接受消息 2009-10-21 09:08 READ
          確實(shí)如樓上所說(shuō)...  回復(fù)  更多評(píng)論
            

          主站蜘蛛池模板: 屯留县| 布尔津县| 南木林县| 偃师市| 兴业县| 陵川县| 汾阳市| 自治县| 枣强县| 德安县| 万盛区| 色达县| 略阳县| 特克斯县| 肇庆市| 平邑县| 阿合奇县| 晋宁县| 黄冈市| 什邡市| 肃南| 新乡县| 泽库县| 新兴县| 临夏市| 内乡县| 巧家县| 邢台县| 辽中县| 惠安县| 富顺县| 松潘县| 湟源县| 临泽县| 延安市| 达孜县| 东方市| 江安县| 太康县| 开鲁县| 玛多县|