鍦?Java 閲屾湁 JMS 鐨勫涓疄鐜幫紝ActiveMQ 鏄疉pache鍑哄搧錛屾渶嫻佽鐨勶紝鑳藉姏寮哄姴鐨勫紑婧愭秷鎭葷嚎銆侫ctiveMQ 鏄竴涓畬鍏ㄦ敮鎸丣MS1.1鍜孞2EE 1.4瑙勮寖鐨?JMS Provider瀹炵幇銆?/p>
JMS 瀹氫箟浜嗕袱縐嶆柟寮忥細(xì)Quere錛堢偣瀵圭偣錛夛紱Topic錛堝彂甯?璁㈤槄錛夈?/p>
ConnectionFactory 鏄繛鎺ュ伐鍘傦紝璐熻矗鍒涘緩Connection銆侰onnection 璐熻矗鍒涘緩 Session銆侱estination 鏄秷鎭殑鐩殑鍦般?/p>
Session 鍒涘緩 MessageProducer錛堢敤鏉ュ彂娑堟伅錛?鍜?MessageConsumer錛堢敤鏉ユ帴鏀舵秷鎭級銆?/p>
ActiveMQ鐨勫畼鏂圭綉鍧錛?a >http://activemq.apache.org銆傚湪姝ゅ彲浠ヤ笅杞紸ctiveMQ鐨勬渶鏂扮増鏈拰闃呰鐩稿叧鏂囨。銆?/p>
涓嬮潰鏄嬌鐢ˋctiveMQ鍙戦佸拰鎺ユ敹娑堟伅鐨凧AVA瀹炵幇錛?/p>
1銆佹秷鎭彂閫佽?/p>
package com.jmsd;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 璇存槑錛?activemq send message
*
* @author xajava
* @version 鍒涘緩鏃墮棿錛?012-10-24 涓嬪崍1:22:40
*/
public class JmsSender {
private String USER = ActiveMQConnection.DEFAULT_USER;
private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private String SUBJECT = "ActiveMQ.Demo";
private Destination destination = null;
private Connection conn = null;
private Session session = null;
private MessageProducer producer = null;
// 鍒濆鍖?br /> private void initialize() throws JMSException, Exception {
// 榪炴帴宸ュ巶
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
conn = connectionFactory.createConnection();
// 浜嬪姟鎬т細(xì)璇濓紝鑷姩紜娑堟伅
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 娑堟伅鐨勭洰鐨勫湴錛圦ueue/Topic錛?br />destination = session.createQueue(SUBJECT);
// destination = session.createTopic(SUBJECT);
// 娑堟伅鐨勬彁渚涜咃紙鐢熶駭鑰咃級
producer = session.createProducer(destination);
// 涓嶆寔涔呭寲娑堟伅
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public void sendMessage(String msgType) throws JMSException, Exception {
initialize();
// 榪炴帴鍒癑MS鎻愪緵鑰咃紙鏈嶅姟鍣級
conn.start();
// 鍙戦佹枃鏈秷鎭?br />if ("text".equals(msgType)) {
String textMsg = "ActiveMQ Text Message!";
TextMessage msg = session.createTextMessage();
// TextMessage msg = session.createTextMessage(textMsg);
msg.setText(textMsg);
producer.send(msg);
}
// 鍙戦丮ap娑堟伅
if ("map".equals(msgType)) {
MapMessage msg = session.createMapMessage();
msg.setBoolean("boolean", true);
msg.setShort("short", (short) 0);
msg.setLong("long", 123456);
msg.setString("MapMessage", "ActiveMQ Map Message!");
producer.send(msg);
}
// 鍙戦佹祦娑堟伅
if ("stream".equals(msgType)) {
String streamValue = "ActiveMQ stream Message!";
StreamMessage msg = session.createStreamMessage();
msg.writeString(streamValue);
msg.writeBoolean(false);
msg.writeLong(1234567890);
producer.send(msg);
}
// 鍙戦佸璞℃秷鎭?br />if ("object".equals(msgType)) {
JmsObjectMessageBean jmsObject = new JmsObjectMessageBean("ActiveMQ Object Message", 18, false);
ObjectMessage msg = session.createObjectMessage();
msg.setObject(jmsObject);
producer.send(msg);
}
// 鍙戦佸瓧鑺傛秷鎭?br />if ("bytes".equals(msgType)) {
String byteValue = "瀛楄妭娑堟伅";
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(byteValue.getBytes());
producer.send(msg);
}
}
// 鍏抽棴榪炴帴
public void close() throws JMSException {
if (producer != null)
producer.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
2銆佹秷鎭帴鏀惰?/p>
package com.jmsd;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 璇存槑錛?br /> *
* @author xajava
* @version 鍒涘緩鏃墮棿錛?012-10-24 涓嬪崍2:06:48
*/
public class JmsReceiver implements MessageListener {
private String USER = ActiveMQConnection.DEFAULT_USER;
private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private String SUBJECT = "ActiveMQ.Demo";
private Destination dest = null;
private Connection conn = null;
private Session session = null;
private MessageConsumer consumer = null;
private boolean stop = false;
// 鍒濆鍖?br /> private void initialize() throws JMSException, Exception {
// 榪炴帴宸ュ巶鏄敤鎴峰垱寤鴻繛鎺ョ殑瀵硅薄.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
// 榪炴帴宸ュ巶鍒涘緩涓涓猨ms connection
conn = connectionFactory.createConnection();
// 鏄敓浜у拰娑堣垂鐨勪竴涓崟綰跨▼涓婁笅鏂囥備細(xì)璇濈敤浜庡垱寤烘秷鎭殑鐢熶駭鑰咃紝娑堣垂鑰呭拰娑堟伅銆備細(xì)璇濇彁渚涗簡涓涓簨鍔℃х殑涓婁笅鏂囥?br /> session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 涓嶆敮鎸佷簨鍔?br /> // 鐩殑鍦版槸瀹㈡埛鐢ㄦ潵鎸囧畾浠栫敓浜ф秷鎭殑鐩爣榪樻湁浠栨秷璐規(guī)秷鎭殑鏉ユ簮鐨勫璞?
dest = session.createQueue(SUBJECT);
// dest = session.createTopic(SUBJECT);
// 浼?xì)璇濆垱寰忔秷鎭殑鐢熶骇鑰呭皢娑堟伅鍙戦佸埌鐩殑鍦?br /> consumer = session.createConsumer(dest);
}
/**
* 娑堣垂娑堟伅
*
* @throws JMSException
* @throws Exception
*/
public void receiveMessage() throws JMSException, Exception {
initialize();
conn.start();
consumer.setMessageListener(this);
// 絳夊緟鎺ユ敹娑堟伅
while (!stop) {
Thread.sleep(5000);
}
}
@SuppressWarnings("rawtypes")
@Override
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
TextMessage message = (TextMessage) msg;
System.out.println("------Received TextMessage------");
System.out.println(message.getText());
} else if (msg instanceof MapMessage) {
MapMessage message = (MapMessage) msg;
System.out.println("------Received MapMessage------");
System.out.println(message.getLong("long"));
System.out.println(message.getBoolean("boolean"));
System.out.println(message.getShort("short"));
System.out.println(message.getString("MapMessage"));
System.out.println("------Received MapMessage for while------");
Enumeration enumer = message.getMapNames();
while (enumer.hasMoreElements()) {
Object obj = enumer.nextElement();
System.out.println(message.getObject(obj.toString()));
}
} else if (msg instanceof StreamMessage) {
StreamMessage message = (StreamMessage) msg;
System.out.println("------Received StreamMessage------");
System.out.println(message.readString());
System.out.println(message.readBoolean());
System.out.println(message.readLong());
} else if (msg instanceof ObjectMessage) {
System.out.println("------Received ObjectMessage------");
ObjectMessage message = (ObjectMessage) msg;
JmsObjectMessageBean jmsObject = (JmsObjectMessageBean) message.getObject();
System.out.println(jmsObject.getUserName() + "__" + jmsObject.getAge() + "__" + jmsObject.isFlag());
} else if (msg instanceof BytesMessage) {
System.out.println("------Received BytesMessage------");
BytesMessage message = (BytesMessage) msg;
byte[] byteContent = new byte[1024];
int length = -1;
StringBuffer content = new StringBuffer();
while ((length = message.readBytes(byteContent)) != -1) {
content.append(new String(byteContent, 0, length));
}
System.out.println(content.toString());
} else {
System.out.println(msg);
}
stop = true;
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
this.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 鍏抽棴榪炴帴
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
3銆佸璞℃秷鎭?/p>
package com.jmsd;
import java.io.Serializable;
/**
* 璇存槑錛?JMS 瀵硅薄娑堟伅紺轟緥瀵硅薄
*
* @author xajava
* @version 鍒涘緩鏃墮棿錛?012-10-24 涓嬪崍1:56:07
*/
public class JmsObjectMessageBean implements Serializable {
private static final long serialVersionUID = 2620024932905963095L;
private String userName;
private int age = 16;
private boolean flag = true;
public JmsObjectMessageBean(String userName,int age,boolean flag){
this.setUserName(userName);
this.setAge(age);
this.setFlag(flag);
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
4銆佹祴璇曠被
package com.jmsd;
import javax.jms.JMSException;
/**
* 璇存槑錛?
*
* @author xajava
* @version 鍒涘緩鏃墮棿錛?012-10-22 涓嬪崍4:33:17
*/
public class Test {
public static void main(String[] args) throws JMSException, Exception {
JmsSender sender = new JmsSender();
JmsReceiver receiver = new JmsReceiver();
sender.sendMessage("bytes");
sender.close();
receiver.receiveMessage();
receiver.close();
}
}
package com.jmsd;
import javax.jms.JMSException;
/**
* 璇存槑錛?
*
* @author xajava
* @version 鍒涘緩鏃墮棿錛?012-10-22 涓嬪崍4:33:17
*/
public class Test {
public static void main(String[] args) throws JMSException, Exception {
JmsSender sender = new JmsSender();
JmsReceiver receiver = new JmsReceiver();
sender.sendMessage("bytes");
sender.close();
receiver.receiveMessage();
receiver.close();
}
}
package com.jmsd;
import javax.jms.JMSException;
/**
* 璇存槑錛?
*
* @author xajava
* @version 鍒涘緩鏃墮棿錛?012-10-22 涓嬪崍4:33:17
*/
public class Test {
public static void main(String[] args) throws JMSException, Exception {
JmsSender sender = new JmsSender();
JmsReceiver receiver = new JmsReceiver();
sender.sendMessage("bytes");
sender.close();
receiver.receiveMessage();
receiver.close();
}
}