少年阿賓

          那些青春的歲月

            BlogJava :: 首頁 :: 聯系 :: 聚合  :: 管理
            500 Posts :: 0 Stories :: 135 Comments :: 0 Trackbacks

          企業中各項目中相互協作的時候可能用得到消息通知機制。比如有東西更新了,可以通知做索引。

          在 Java 里有 JMS 的多個實現。其中 apache 下的 ActiveMQ 就是不錯的選擇。ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現。這里示例下使用 ActiveMQ

          用 ActiveMQ 最好還是了解下 JMS

          JMS 公共 點對點域 發布/訂閱域
          ConnectionFactory QueueConnectionFactory TopicConnectionFactory
          Connection QueueConnection TopicConnection
          Destination Queue Topic
          Session QueueSession TopicSession
          MessageProducer QueueSender TopicPublisher
          MessageConsumer QueueReceiver TopicSubscriber

          JMS 定義了兩種方式:Quere(點對點);Topic(發布/訂閱)。

          ConnectionFactory 是連接工廠,負責創建Connection。

          Connection 負責創建 Session。

          Session 創建 MessageProducer(用來發消息) 和 MessageConsumer(用來接收消息)。

          Destination 是消息的目的地。

          詳細的可以網上找些 JMS 規范(有中文版)。

          下載 apache-activemq-5.3.0。http://activemq.apache.org/download.html ,解壓,然后雙擊 bin/activemq.bat。運行后,可以在 http://localhost:8161/admin 觀察。也有 demo, http://localhost:8161/demo 。把 activemq-all-5.3.0.jar 加入 classpath。

          Jms 發送 代碼:

          public static void main(String[] args) throws Exception {   
              ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
            
              Connection connection = connectionFactory.createConnection();   
              connection.start();   
            
              Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
              Destination destination = session.createQueue("my-queue");   
            
              MessageProducer producer = session.createProducer(destination);   
              for(int i=0; i<3; i++) {   
                  MapMessage message = session.createMapMessage();   
                  message.setLong("count", new Date().getTime());   
                  Thread.sleep(1000);   
                  //通過消息生產者發出消息   
                  producer.send(message);   
              }   
              session.commit();   
              session.close();   
              connection.close();   
          }



          Jms 接收代碼:


          public static void main(String[] args) throws Exception {   
              ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();   
            
              Connection connection = connectionFactory.createConnection();   
              connection.start();   
            
              final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);   
              Destination destination = session.createQueue("my-queue");   
            
              MessageConsumer consumer = session.createConsumer(destination);   
              int i=0;   
              while(i<3) {   
                  i++;   
                  MapMessage message = (MapMessage) consumer.receive();   
                  session.commit();   
            
                  //TODO something....   
                  System.out.println("收到消息:" + new Date(message.getLong("count")));   
              }   
            
              session.close();   
              connection.close();   
          }



          JMS五種消息的發送/接收的例子

          轉自:http://chenjumin.javaeye.com/blog/687124  

          1、消息發送

          //連接工廠  
          ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
                  ActiveMQConnection.DEFAULT_USER,  
                  ActiveMQConnection.DEFAULT_PASSWORD,  
                  "tcp://localhost:61616");  
           
          //連接到JMS提供者  
          Connection conn = connFactory.createConnection();  
          conn.start();  
           
          //事務性會話,自動確認消息  
          Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
           
          //消息的目的地  
          Destination destination = session.createQueue("queue.hello");  
           
          //消息生產者  
          MessageProducer producer = session.createProducer(destination);  
          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化  
           
           
          //文本消息  
          TextMessage textMessage = session.createTextMessage("文本消息");  
          producer.send(textMessage);  
           
          //鍵值對消息  
          MapMessage mapMessage = session.createMapMessage();  
          mapMessage.setLong("age", new Long(32));  
          mapMessage.setDouble("sarray", new Double(5867.15));  
          mapMessage.setString("username", "鍵值對消息");  
          producer.send(mapMessage);  
           
          //流消息  
          StreamMessage streamMessage = session.createStreamMessage();  
          streamMessage.writeString("streamMessage流消息");  
          streamMessage.writeLong(55);  
          producer.send(streamMessage);  
           
          //字節消息  
          String s = "BytesMessage字節消息";  
          BytesMessage bytesMessage = session.createBytesMessage();  
          bytesMessage.writeBytes(s.getBytes());  
          producer.send(bytesMessage);  
           
          //對象消息  
          User user = new User("cjm", "對象消息"); //User對象必須實現Serializable接口  
          ObjectMessage objectMessage = session.createObjectMessage();  
          objectMessage.setObject(user);  
          producer.send(objectMessage);  
           
           
          session.commit(); //在事務性會話中,只有commit之后,消息才會真正到達目的地  
          producer.close();  
          session.close();  
          conn.close(); 



          2、消息接收:通過消息監聽器的方式接收消息


          public class Receiver implements MessageListener{  
              private boolean stop = false;  
                
              public void execute() throws Exception {  
                  //連接工廠  
                  ConnectionFactory connFactory = new ActiveMQConnectionFactory(  
                          ActiveMQConnection.DEFAULT_USER,  
                          ActiveMQConnection.DEFAULT_PASSWORD,  
                          "tcp://localhost:61616");  
                    
                  //連接到JMS提供者  
                  Connection conn = connFactory.createConnection();  
                  conn.start();  
                    
                  //事務性會話,自動確認消息  
                  Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);  
                    
                  //消息的來源地  
                  Destination destination = session.createQueue("queue.hello");  
                    
                  //消息消費者  
                  MessageConsumer consumer = session.createConsumer(destination);  
                  consumer.setMessageListener(this);  
                    
                  //等待接收消息  
                  while(!stop){  
                      Thread.sleep(5000);  
                  }  
                    
                  session.commit();  
                    
                  consumer.close();  
                  session.close();  
                  conn.close();  
              }  
           
              public void onMessage(Message m) {  
                  try{  
                      if(m instanceof TextMessage){ //接收文本消息  
                          TextMessage message = (TextMessage)m;  
                          System.out.println(message.getText());  
                      }else if(m instanceof MapMessage){ //接收鍵值對消息  
                          MapMessage message = (MapMessage)m;  
                          System.out.println(message.getLong("age"));  
                          System.out.println(message.getDouble("sarray"));  
                          System.out.println(message.getString("username"));  
                      }else if(m instanceof StreamMessage){ //接收流消息  
                          StreamMessage message = (StreamMessage)m;  
                          System.out.println(message.readString());  
                          System.out.println(message.readLong());  
                      }else if(m instanceof BytesMessage){ //接收字節消息  
                          byte[] b = new byte[1024];  
                          int len = -1;  
                          BytesMessage message = (BytesMessage)m;  
                          while((len=message.readBytes(b))!=-1){  
                              System.out.println(new String(b, 0, len));  
                          }  
                      }else if(m instanceof ObjectMessage){ //接收對象消息  
                          ObjectMessage message = (ObjectMessage)m;  
                          User user = (User)message.getObject();  
                          System.out.println(user.getUsername() + " _ " + user.getPassword());  
                      }else{  
                          System.out.println(m);  
                      }  
                        
                      stop = true;  
                  }catch(JMSException e){  
                      stop = true;  
                      e.printStackTrace();  
                  }  
              }  





          http://blog.csdn.net/caihaijiang/article/details/5903296
          posted on 2012-08-02 14:59 abin 閱讀(1765) 評論(0)  編輯  收藏 所屬分類: ActiveMQ
          主站蜘蛛池模板: 香格里拉县| 延安市| 班玛县| 瓮安县| 来安县| 类乌齐县| 西藏| 班玛县| 博湖县| 大冶市| 昌江| 郯城县| 临夏县| 十堰市| 延川县| 柳林县| 通城县| 普格县| 合水县| 德惠市| 宝丰县| 宜宾市| 西畴县| 北川| 桃园市| 沙田区| 扶风县| 尼木县| 定结县| 双牌县| 义马市| 阿拉善盟| 东港市| 宣化县| 民勤县| 德昌县| 阜宁县| 元谋县| 大庆市| 新兴县| 荃湾区|