hk2000c技術(shù)專(zhuān)欄

          技術(shù)源于哲學(xué),哲學(xué)來(lái)源于生活 關(guān)心生活,關(guān)注健康,關(guān)心他人

            BlogJava :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            111 隨筆 :: 1 文章 :: 28 評(píng)論 :: 0 Trackbacks

          本篇主要講解在未使用其他框架(Spring)整合情況下,獨(dú)立基于ActiveMQ,使用JMS規(guī)范進(jìn)行消息通信。
              
               一.JMS回顧
                 因?yàn)锳ctiveMQ是一個(gè)JMS Provider的實(shí)現(xiàn),因此在開(kāi)始實(shí)作前,有必要復(fù)習(xí)下JMS的基礎(chǔ)知識(shí)
              Java Message Service (JMS)是sun提出來(lái)的為J2EE提供企業(yè)消息處理的一套規(guī)范,JMS目前有2套規(guī)范還在使用JMS 1.0.2b和1.1. 1.1已經(jīng)成為主流的JMS Provider事實(shí)上的標(biāo)準(zhǔn)了.
                *1.1主要在session上面有一些重要改變,比如支持建立同一session上的transaction,讓他支持同時(shí)發(fā)送P2P(Queue)消息和接受
          Topic消息。
                
                 在JMS中間主要定義了2種消息模式Point-to-Point (點(diǎn)對(duì)點(diǎn)),Publich/Subscribe Model (發(fā)布/訂閱者),
              其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化訂閱)2種消息處理方式。
              
               下面是JMS規(guī)范基本的接口和實(shí)現(xiàn)
               JMS Common Interfacse PTP-Specific Interface   Pub/Sub-specific interfaces
               ConnectionFactory     QueueConnectionFactory   TopicConnectionFactory
               Connection            QueueConnection          TopicConnection
               Destination           Queue                    Topic
               Session               QueueSession             TopiSession
               MessageProducer       QueueSender              TopicPublisher
               MessageConsumer       QueueReceiver/QueueBrwer TopicSubscriber


               二.使用Queue

                   下面以ActiveMQ example的代碼為主進(jìn)行說(shuō)明
                  
                  1. 使用ActiveMQ的Connection,ConnectionFactory 建立連接,注意這里沒(méi)有用到pool
                 

          java 代碼
          1. import org.apache.activemq.ActiveMQConnection   
          2. import org.apache.activemq.ActiveMQConnectionFactory   

                  //建立Connection

          java 代碼
          1. protected Connection createConnection() throws JMSException, Exception {   
          2.      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);   
          3.      Connection connection = connectionFactory.createConnection();   
          4.      if (durable && clientID!=null) {   
          5.          connection.setClientID(clientID);   
          6.      }   
          7.      connection.start();   
          8.      return connection;   
          9.     }  

                  //建立Session
            

          java 代碼
          1. protected Session createSession(Connection connection) throws Exception {   
          2.          Session session = connection.createSession(transacted, ackMode);   
          3.          return session;   
          4.         }   

                  2。發(fā)送消息的代碼
           //建立QueueSession
           

          java 代碼
          1. protected MessageProducer createProducer(Session session) throws JMSException {   
          2.         Destincation destination = session.createQueue("queue.hello");   
          3.         MessageProducer producer = session.createProducer(destination);   
          4.         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
          5.            
          6.         if( timeToLive!=0 )   
          7.             producer.setTimeToLive(timeToLive);   
          8.         return producer;   
          9.         }   

                   //使用Producer發(fā)送消息到Queue
              

          java 代碼
          1. producer.send(message);   

                 
                  3。接受消息,在JMS規(guī)范里面,你可以使用
            

          java 代碼
          1. QueueReceiver/QueueBrowser直接接受消息,但是更多的情況下我們采用消息通知方式,即實(shí)現(xiàn)MessageListener接口   
          2.  public void onMessage(Message message) {   
          3.  //process message   
          4.  }   
          5.           
          6.  //set MessageListner ,receive message   
          7.  Destincation destination = session.createQueue("queue.hello");   
          8.  consumer = session.createConsumer(destination);   
          9.  consumer.setMessageListener(this);   

                 
                  以上就是使用jms queue發(fā)送接受消息的基本方式

           
               三 Topic

                  1. 建立連接
             

          java 代碼
          1. protected Connection createConnection() throws JMSException, Exception {      
          2.         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);      
          3.         Connection connection = connectionFactory.createConnection();      
          4.         //如果你要使用DurableSubScription 方式,你必須為connection設(shè)置一個(gè)ClientID      
          5.         if (durable && clientID!=null) {      
          6.             connection.setClientID(clientID);      
          7.         }      
          8.         connection.start();      
          9.         return connection;      
          10.        }      

                 2. 建立Session

          java 代碼
          1. protected Session createSession(Connection connection) throws Exception {      
          2.         Session session = connection.createSession(transacted, ackMode);      
          3.         return session;      
          4.         }    

           3.創(chuàng)建Producer 發(fā)送消息到Topic   
                 

          java 代碼
          1. //create topic on  session   
          2.        topic = session.createTopic("topic.hello");   
          3.  producer = session.createProducer(topic);   
          4.        //send message    
          5.        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
          6.  producer.send(message);   


           4.創(chuàng)建Consumer接受消息(基本上和Queue相同)

          java 代碼
          1. Destincation destination  = session.createTopic("topic.hello");      
          2. MessageConsumer consumer = session.createConsumer(destination);      
          3. consumer.setMessageListener(this);      
          4.            
          5.      //如果你使用的是Durable Subscription方式,你必須在建立connection的時(shí)候      
          6.      //設(shè)置ClientID,而且建立comsumer的時(shí)候使用createDurableSubscriber方法,為他指定一個(gè)consumerName。      
          7.  //connection.setClientID(clientId);      
          8.  //consumer = session.createDurableSubscriber((Topic) destination, consumerName);   

                 
           四:連接ActiveMQ的方式
                  ActiveMQConnectionFactory 提供了多種連接到Broker的方式activemq.apache.org/uri-protocols.html

           常見(jiàn)的有
           vm://host:port     //vm
           tcp://host:port    //tcp
           ssl://host:port    //SSL
           stomp://host:port  //stomp協(xié)議可以跨語(yǔ)言,目前有很多種stomp client 庫(kù)(java,c#,c/c++,ruby,python...);

          posted on 2007-11-16 17:05 hk2000c 閱讀(8333) 評(píng)論(2)  編輯  收藏 所屬分類(lèi): JMS

          評(píng)論

          # re: ActiveMQ 實(shí)踐之路(二) 使用Queue或者Topic發(fā)送/接受消息 2009-06-03 16:43 藍(lán)雨
          寫(xiě)的太簡(jiǎn)單了, 初學(xué)者肯定看不懂!  回復(fù)  更多評(píng)論
            

          # re: ActiveMQ 實(shí)踐之路(二) 使用Queue或者Topic發(fā)送/接受消息 2009-10-21 09:08 READ
          確實(shí)如樓上所說(shuō)...  回復(fù)  更多評(píng)論
            

          主站蜘蛛池模板: 金昌市| 册亨县| 望谟县| 丹东市| 丰县| 伊川县| 兴宁市| 银川市| 新河县| 沭阳县| 临安市| 湘潭市| 义马市| 石阡县| 汶川县| 东兰县| 洪江市| 诏安县| 广灵县| 金塔县| 渭南市| 汕尾市| 宁津县| 铜鼓县| 三门县| 页游| 共和县| 沅陵县| 靖州| 济源市| 剑河县| 奉节县| 杂多县| 青州市| 南岸区| 平南县| 夹江县| 呼图壁县| 河北区| 项城市| 南宫市|