本篇主要講解在未使用其他框架(Spring)整合情況下,獨(dú)立基于ActiveMQ,使用JMS規(guī)范進(jìn)行消息通信。
一.JMS回顧
因?yàn)锳ctiveMQ是一個(gè)JMS Provider的實(shí)現(xiàn),因此在開(kāi)始實(shí)作前,有必要復(fù)習(xí)下JMS的基礎(chǔ)知識(shí)
Java Message Service (JMS)是sun提出來(lái)的為J2EE提供企業(yè)消息處理的一套規(guī)范,JMS目前有2套規(guī)范還在使用JMS 1.0.2b和1.1. 1.1已經(jīng)成為主流的JMS Provider事實(shí)上的標(biāo)準(zhǔn)了.
*1.1主要在session上面有一些重要改變,比如支持建立同一session上的transaction,讓他支持同時(shí)發(fā)送P2P(Queue)消息和接受
Topic消息。
在JMS中間主要定義了2種消息模式Point-to-Point (點(diǎn)對(duì)點(diǎn)),Publich/Subscribe Model (發(fā)布/訂閱者),
其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化訂閱)2種消息處理方式。
下面是JMS規(guī)范基本的接口和實(shí)現(xiàn)
JMS Common Interfacse PTP-Specific Interface Pub/Sub-specific interfaces
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopiSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver/QueueBrwer TopicSubscriber
二.使用Queue
下面以ActiveMQ example的代碼為主進(jìn)行說(shuō)明
1. 使用ActiveMQ的Connection,ConnectionFactory 建立連接,注意這里沒(méi)有用到pool
java 代碼
- import org.apache.activemq.ActiveMQConnection
- import org.apache.activemq.ActiveMQConnectionFactory
//建立Connection
java 代碼
- protected Connection createConnection() throws JMSException, Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
- Connection connection = connectionFactory.createConnection();
- if (durable && clientID!=null) {
- connection.setClientID(clientID);
- }
- connection.start();
- return connection;
- }
//建立Session
java 代碼
- protected Session createSession(Connection connection) throws Exception {
- Session session = connection.createSession(transacted, ackMode);
- return session;
- }
2。發(fā)送消息的代碼
//建立QueueSession
java 代碼
- protected MessageProducer createProducer(Session session) throws JMSException {
- Destincation destination = session.createQueue("queue.hello");
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- if( timeToLive!=0 )
- producer.setTimeToLive(timeToLive);
- return producer;
- }
//使用Producer發(fā)送消息到Queue
java 代碼
3。接受消息,在JMS規(guī)范里面,你可以使用
java 代碼
- QueueReceiver/QueueBrowser直接接受消息,但是更多的情況下我們采用消息通知方式,即實(shí)現(xiàn)MessageListener接口
- public void onMessage(Message message) {
-
- }
-
-
- Destincation destination = session.createQueue("queue.hello");
- consumer = session.createConsumer(destination);
- consumer.setMessageListener(this);
以上就是使用jms queue發(fā)送接受消息的基本方式
三 Topic
1. 建立連接
java 代碼
- protected Connection createConnection() throws JMSException, Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
- Connection connection = connectionFactory.createConnection();
-
- if (durable && clientID!=null) {
- connection.setClientID(clientID);
- }
- connection.start();
- return connection;
- }
2. 建立Session
java 代碼
- protected Session createSession(Connection connection) throws Exception {
- Session session = connection.createSession(transacted, ackMode);
- return session;
- }
3.創(chuàng)建Producer 發(fā)送消息到Topic
java 代碼
-
- topic = session.createTopic("topic.hello");
- producer = session.createProducer(topic);
-
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(message);
4.創(chuàng)建Consumer接受消息(基本上和Queue相同)
java 代碼
- Destincation destination = session.createTopic("topic.hello");
- MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(this);
-
-
-
-
-
四:連接ActiveMQ的方式
ActiveMQConnectionFactory 提供了多種連接到Broker的方式activemq.apache.org/uri-protocols.html
常見(jiàn)的有
vm://host:port //vm
tcp://host:port //tcp
ssl://host:port //SSL
stomp://host:port //stomp協(xié)議可以跨語(yǔ)言,目前有很多種stomp client 庫(kù)(java,c#,c/c++,ruby,python...);
|