少年阿賓

          那些青春的歲月

            BlogJava :: 首頁 :: 聯(lián)系 :: 聚合  :: 管理
            500 Posts :: 0 Stories :: 135 Comments :: 0 Trackbacks

          企業(yè)中各項(xiàng)目中相互協(xié)作的時(shí)候可能用得到消息通知機(jī)制。比如有東西更新了,可以通知做索引。

          在 Java 里有 JMS 的多個(gè)實(shí)現(xiàn)。其中 apache 下的 ActiveMQ 就是不錯(cuò)的選擇。ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。這里示例下使用 ActiveMQ

          用 ActiveMQ 最好還是了解下 JMS

          JMS 公共 點(diǎn)對(duì)點(diǎn)域 發(fā)布/訂閱域
          ConnectionFactory QueueConnectionFactory TopicConnectionFactory
          Connection QueueConnection TopicConnection
          Destination Queue Topic
          Session QueueSession TopicSession
          MessageProducer QueueSender TopicPublisher
          MessageConsumer QueueReceiver TopicSubscriber

          JMS 定義了兩種方式:Quere(點(diǎn)對(duì)點(diǎn));Topic(發(fā)布/訂閱)。

          ConnectionFactory 是連接工廠,負(fù)責(zé)創(chuàng)建Connection。

          Connection 負(fù)責(zé)創(chuàng)建 Session。

          Session 創(chuàng)建 MessageProducer(用來發(fā)消息) 和 MessageConsumer(用來接收消息)。

          Destination 是消息的目的地。

          詳細(xì)的可以網(wǎng)上找些 JMS 規(guī)范(有中文版)。

          下載 apache-activemq-5.3.0。http://activemq.apache.org/download.html ,解壓,然后雙擊 bin/activemq.bat。運(yùn)行后,可以在 http://localhost:8161/admin 觀察。也有 demo, http://localhost:8161/demo 。把 activemq-all-5.3.0.jar 加入 classpath。

          Jms 發(fā)送 代碼:

          public static void main(String[] args) throws Exception {   
              ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
            
              Connection connection = connectionFactory.createConnection();   
              connection.start();   
            
              Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
              Destination destination = session.createQueue("my-queue");   
            
              MessageProducer producer = session.createProducer(destination);   
              for(int i=0; i<3; i++) {   
                  MapMessage message = session.createMapMessage();   
                  message.setLong("count", new Date().getTime());   
                  Thread.sleep(1000);   
                  //通過消息生產(chǎn)者發(fā)出消息   
                  producer.send(message);   
              }   
              session.commit();   
              session.close();   
              connection.close();   
          }



          Jms 接收代碼:


          public static void main(String[] args) throws Exception {   
              ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
            
              Connection connection = connectionFactory.createConnection();   
              connection.start();   
            
              final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
              Destination destination = session.createQueue("my-queue");   
            
              MessageConsumer consumer = session.createConsumer(destination);   
              int i=0;   
              while(i<3) {   
                  i++;   
                  MapMessage message = (MapMessage) consumer.receive();   
                  session.commit();   
            
                  //TODO something....   
                  System.out.println("收到消息:" + new Date(message.getLong("count")));   
              }   
            
              session.close();   
              connection.close();   
          }



          JMS五種消息的發(fā)送/接收的例子

          轉(zhuǎn)自:http://chenjumin.javaeye.com/blog/687124  

          1、消息發(fā)送

          //連接工廠  
          ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
                  ActiveMQConnection.DEFAULT_USER,  
                  ActiveMQConnection.DEFAULT_PASSWORD,  
                  "tcp://localhost:61616");  
           
          //連接到JMS提供者  
          Connection conn = connFactory.createConnection();  
          conn.start();  
           
          //事務(wù)性會(huì)話,自動(dòng)確認(rèn)消息  
          Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
           
          //消息的目的地  
          Destination destination = session.createQueue("queue.hello");  
           
          //消息生產(chǎn)者  
          MessageProducer producer = session.createProducer(destination);  
          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化  
           
           
          //文本消息  
          TextMessage textMessage = session.createTextMessage("文本消息");  
          producer.send(textMessage);  
           
          //鍵值對(duì)消息  
          MapMessage mapMessage = session.createMapMessage();  
          mapMessage.setLong("age", new Long(32));  
          mapMessage.setDouble("sarray", new Double(5867.15));  
          mapMessage.setString("username", "鍵值對(duì)消息");  
          producer.send(mapMessage);  
           
          //流消息  
          StreamMessage streamMessage = session.createStreamMessage();  
          streamMessage.writeString("streamMessage流消息");  
          streamMessage.writeLong(55);  
          producer.send(streamMessage);  
           
          //字節(jié)消息  
          String s = "BytesMessage字節(jié)消息";  
          BytesMessage bytesMessage = session.createBytesMessage();  
          bytesMessage.writeBytes(s.getBytes());  
          producer.send(bytesMessage);  
           
          //對(duì)象消息  
          User user = new User("cjm", "對(duì)象消息"); //User對(duì)象必須實(shí)現(xiàn)Serializable接口  
          ObjectMessage objectMessage = session.createObjectMessage();  
          objectMessage.setObject(user);  
          producer.send(objectMessage);  
           
           
          session.commit(); //在事務(wù)性會(huì)話中,只有commit之后,消息才會(huì)真正到達(dá)目的地  
          producer.close();  
          session.close();  
          conn.close(); 



          2、消息接收:通過消息監(jiān)聽器的方式接收消息


          public class Receiver implements MessageListener{  
              private boolean stop = false;  
                
              public void execute() throws Exception {  
                  //連接工廠  
                  ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
                          ActiveMQConnection.DEFAULT_USER,  
                          ActiveMQConnection.DEFAULT_PASSWORD,  
                          "tcp://localhost:61616");  
                    
                  //連接到JMS提供者  
                  Connection conn = connFactory.createConnection();  
                  conn.start();  
                    
                  //事務(wù)性會(huì)話,自動(dòng)確認(rèn)消息  
                  Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
                    
                  //消息的來源地  
                  Destination destination = session.createQueue("queue.hello");  
                    
                  //消息消費(fèi)者  
                  MessageConsumer consumer = session.createConsumer(destination);  
                  consumer.setMessageListener(this);  
                    
                  //等待接收消息  
                  while(!stop){  
                      Thread.sleep(5000);  
                  }  
                    
                  session.commit();  
                    
                  consumer.close();  
                  session.close();  
                  conn.close();  
              }  
           
              public void onMessage(Message m) {  
                  try{  
                      if(m instanceof TextMessage){ //接收文本消息  
                          TextMessage message = (TextMessage)m;  
                          System.out.println(message.getText());  
                      }else if(m instanceof MapMessage){ //接收鍵值對(duì)消息  
                          MapMessage message = (MapMessage)m;  
                          System.out.println(message.getLong("age"));  
                          System.out.println(message.getDouble("sarray"));  
                          System.out.println(message.getString("username"));  
                      }else if(m instanceof StreamMessage){ //接收流消息  
                          StreamMessage message = (StreamMessage)m;  
                          System.out.println(message.readString());  
                          System.out.println(message.readLong());  
                      }else if(m instanceof BytesMessage){ //接收字節(jié)消息  
                          byte[] b = new byte[1024];  
                          int len = -1;  
                          BytesMessage message = (BytesMessage)m;  
                          while((len=message.readBytes(b))!=-1){  
                              System.out.println(new String(b, 0, len));  
                          }  
                      }else if(m instanceof ObjectMessage){ //接收對(duì)象消息  
                          ObjectMessage message = (ObjectMessage)m;  
                          User user = (User)message.getObject();  
                          System.out.println(user.getUsername() + " _ " + user.getPassword());  
                      }else{  
                          System.out.println(m);  
                      }  
                        
                      stop = true;  
                  }catch(JMSException e){  
                      stop = true;  
                      e.printStackTrace();  
                  }  
              }  





          http://blog.csdn.net/caihaijiang/article/details/5903296
          posted on 2012-08-02 14:59 abin 閱讀(1759) 評(píng)論(0)  編輯  收藏 所屬分類: ActiveMQ

          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 高密市| 武城县| 遂宁市| 任丘市| 宁晋县| 靖宇县| 西吉县| 嘉义市| 翁源县| 湖南省| 芦溪县| 张家川| 南宁市| 万全县| 岳阳县| 大姚县| 海伦市| 砀山县| 驻马店市| 长武县| 阳曲县| 桐乡市| 丰都县| 衡阳县| 阆中市| 怀集县| 天峨县| 四会市| 阜宁县| 曲周县| 蒙自县| 纳雍县| 怀化市| 睢宁县| 红桥区| 鹤山市| 彰化市| 肥城市| 临夏县| 九江县| 中宁县|