JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
在 Java 里有 JMS 的多個實現,ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現。
JMS 定義了兩種方式:Quere(點對點);Topic(發布/訂閱)。
ConnectionFactory 是連接工廠,負責創建Connection。Connection 負責創建 Session。Destination 是消息的目的地。
Session 創建 MessageProducer(用來發消息) 和 MessageConsumer(用來接收消息)。
ActiveMQ的官方網址:http://activemq.apache.org。在此可以下載ActiveMQ的最新版本和閱讀相關文檔。
下面是使用ActiveMQ發送和接收消息的JAVA實現:
1、消息發送者
package com.jmsd;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 說明: activemq send message
*
* @author xajava
* @version 創建時間:2012-10-24 下午1:22:40
*/
public class JmsSender {
private String USER = ActiveMQConnection.DEFAULT_USER;
private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private String SUBJECT = "ActiveMQ.Demo";
private Destination destination = null;
private Connection conn = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
// 連接工廠
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
conn = connectionFactory.createConnection();
// 事務性會話,自動確認消息
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 消息的目的地(Queue/Topic)
destination = session.createQueue(SUBJECT);
// destination = session.createTopic(SUBJECT);
// 消息的提供者(生產者)
producer = session.createProducer(destination);
// 不持久化消息
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public void sendMessage(String msgType) throws JMSException, Exception {
initialize();
// 連接到JMS提供者(服務器)
conn.start();
// 發送文本消息
if ("text".equals(msgType)) {
String textMsg = "ActiveMQ Text Message!";
TextMessage msg = session.createTextMessage();
// TextMessage msg = session.createTextMessage(textMsg);
msg.setText(textMsg);
producer.send(msg);
}
// 發送Map消息
if ("map".equals(msgType)) {
MapMessage msg = session.createMapMessage();
msg.setBoolean("boolean", true);
msg.setShort("short", (short) 0);
msg.setLong("long", 123456);
msg.setString("MapMessage", "ActiveMQ Map Message!");
producer.send(msg);
}
// 發送流消息
if ("stream".equals(msgType)) {
String streamValue = "ActiveMQ stream Message!";
StreamMessage msg = session.createStreamMessage();
msg.writeString(streamValue);
msg.writeBoolean(false);
msg.writeLong(1234567890);
producer.send(msg);
}
// 發送對象消息
if ("object".equals(msgType)) {
JmsObjectMessageBean jmsObject = new JmsObjectMessageBean("ActiveMQ Object Message", 18, false);
ObjectMessage msg = session.createObjectMessage();
msg.setObject(jmsObject);
producer.send(msg);
}
// 發送字節消息
if ("bytes".equals(msgType)) {
String byteValue = "字節消息";
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(byteValue.getBytes());
producer.send(msg);
}
}
// 關閉連接
public void close() throws JMSException {
if (producer != null)
producer.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
2、消息接收者
package com.jmsd;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 說明:
*
* @author xajava
* @version 創建時間:2012-10-24 下午2:06:48
*/
public class JmsReceiver implements MessageListener {
private String USER = ActiveMQConnection.DEFAULT_USER;
private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private String SUBJECT = "ActiveMQ.Demo";
private Destination dest = null;
private Connection conn = null;
private Session session = null;
private MessageConsumer consumer = null;
private boolean stop = false;
// 初始化
private void initialize() throws JMSException, Exception {
// 連接工廠是用戶創建連接的對象.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
// 連接工廠創建一個jms connection
conn = connectionFactory.createConnection();
// 是生產和消費的一個單線程上下文。會話用于創建消息的生產者,消費者和消息。會話提供了一個事務性的上下文。
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 不支持事務
// 目的地是客戶用來指定他生產消息的目標還有他消費消息的來源的對象.
dest = session.createQueue(SUBJECT);
// dest = session.createTopic(SUBJECT);
// 會話創建消息的生產者將消息發送到目的地
consumer = session.createConsumer(dest);
}
/**
* 消費消息
*
* @throws JMSException
* @throws Exception
*/
public void receiveMessage() throws JMSException, Exception {
initialize();
conn.start();
consumer.setMessageListener(this);
// 等待接收消息
while (!stop) {
Thread.sleep(5000);
}
}
@SuppressWarnings("rawtypes")
@Override
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
TextMessage message = (TextMessage) msg;
System.out.println("------Received TextMessage------");
System.out.println(message.getText());
} else if (msg instanceof MapMessage) {
MapMessage message = (MapMessage) msg;
System.out.println("------Received MapMessage------");
System.out.println(message.getLong("long"));
System.out.println(message.getBoolean("boolean"));
System.out.println(message.getShort("short"));
System.out.println(message.getString("MapMessage"));
System.out.println("------Received MapMessage for while------");
Enumeration enumer = message.getMapNames();
while (enumer.hasMoreElements()) {
Object obj = enumer.nextElement();
System.out.println(message.getObject(obj.toString()));
}
} else if (msg instanceof StreamMessage) {
StreamMessage message = (StreamMessage) msg;
System.out.println("------Received StreamMessage------");
System.out.println(message.readString());
System.out.println(message.readBoolean());
System.out.println(message.readLong());
} else if (msg instanceof ObjectMessage) {
System.out.println("------Received ObjectMessage------");
ObjectMessage message = (ObjectMessage) msg;
JmsObjectMessageBean jmsObject = (JmsObjectMessageBean) message.getObject();
System.out.println(jmsObject.getUserName() + "__" + jmsObject.getAge() + "__" + jmsObject.isFlag());
} else if (msg instanceof BytesMessage) {
System.out.println("------Received BytesMessage------");
BytesMessage message = (BytesMessage) msg;
byte[] byteContent = new byte[1024];
int length = -1;
StringBuffer content = new StringBuffer();
while ((length = message.readBytes(byteContent)) != -1) {
content.append(new String(byteContent, 0, length));
}
System.out.println(content.toString());
} else {
System.out.println(msg);
}
stop = true;
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
this.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 關閉連接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
3、對象消息
package com.jmsd;
import java.io.Serializable;
/**
* 說明: JMS 對象消息示例對象
*
* @author xajava
* @version 創建時間:2012-10-24 下午1:56:07
*/
public class JmsObjectMessageBean implements Serializable {
private static final long serialVersionUID = 2620024932905963095L;
private String userName;
private int age = 16;
private boolean flag = true;
public JmsObjectMessageBean(String userName,int age,boolean flag){
this.setUserName(userName);
this.setAge(age);
this.setFlag(flag);
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
4、測試類
package com.jmsd;
import javax.jms.JMSException;
/**
* 說明:
*
* @author xajava
* @version 創建時間:2012-10-22 下午4:33:17
*/
public class Test {
public static void main(String[] args) throws JMSException, Exception {
JmsSender sender = new JmsSender();
JmsReceiver receiver = new JmsReceiver();
sender.sendMessage("bytes");
sender.close();
receiver.receiveMessage();
receiver.close();
}
}
package com.jmsd;
import javax.jms.JMSException;
/**
* 說明:
*
* @author xajava
* @version 創建時間:2012-10-22 下午4:33:17
*/
public class Test {
public static void main(String[] args) throws JMSException, Exception {
JmsSender sender = new JmsSender();
JmsReceiver receiver = new JmsReceiver();
sender.sendMessage("bytes");
sender.close();
receiver.receiveMessage();
receiver.close();
}
}
package com.jmsd;
import javax.jms.JMSException;
/**
* 說明:
*
* @author xajava
* @version 創建時間:2012-10-22 下午4:33:17
*/
public class Test {
public static void main(String[] args) throws JMSException, Exception {
JmsSender sender = new JmsSender();
JmsReceiver receiver = new JmsReceiver();
sender.sendMessage("bytes");
sender.close();
receiver.receiveMessage();
receiver.close();
}
}