企業(yè)中各項(xiàng)目中相互協(xié)作的時(shí)候可能用得到消息通知機(jī)制。比如有東西更新了,可以通知做索引。
在 Java 里有 JMS 的多個(gè)實(shí)現(xiàn)。其中 apache 下的 ActiveMQ 就是不錯(cuò)的選擇。ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。這里示例下使用 ActiveMQ
用 ActiveMQ 最好還是了解下 JMS
JMS 公共 | 點(diǎn)對(duì)點(diǎn)域 | 發(fā)布/訂閱域 |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver | TopicSubscriber |
JMS 定義了兩種方式:Quere(點(diǎn)對(duì)點(diǎn));Topic(發(fā)布/訂閱)。
ConnectionFactory 是連接工廠,負(fù)責(zé)創(chuàng)建Connection。
Connection 負(fù)責(zé)創(chuàng)建 Session。
Session 創(chuàng)建 MessageProducer(用來發(fā)消息) 和 MessageConsumer(用來接收消息)。
Destination 是消息的目的地。
詳細(xì)的可以網(wǎng)上找些 JMS 規(guī)范(有中文版)。
下載 apache-activemq-5.3.0。http://activemq.apache.org/download.html ,解壓,然后雙擊 bin/activemq.bat。運(yùn)行后,可以在 http://localhost:8161/admin 觀察。也有 demo, http://localhost:8161/demo 。把 activemq-all-5.3.0.jar 加入 classpath。
Jms 發(fā)送 代碼:
public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
for(int i=0; i<3; i++) {
MapMessage message = session.createMapMessage();
message.setLong("count", new Date().getTime());
Thread.sleep(1000);
//通過消息生產(chǎn)者發(fā)出消息
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
Jms 接收代碼:
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i=0;
while(i<3) {
i++;
MapMessage message = (MapMessage) consumer.receive();
session.commit();
//TODO something....
System.out.println("收到消息:" + new Date(message.getLong("count")));
}
session.close();
connection.close();
}
JMS五種消息的發(fā)送/接收的例子
轉(zhuǎn)自:http://chenjumin.javaeye.com/blog/687124
1、消息發(fā)送
//連接工廠ConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
//連接到JMS提供者
Connection conn = connFactory.createConnection();
conn.start();
//事務(wù)性會(huì)話,自動(dòng)確認(rèn)消息
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//消息的目的地
Destination destination = session.createQueue("queue.hello");
//消息生產(chǎn)者
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化
//文本消息
TextMessage textMessage = session.createTextMessage("文本消息");
producer.send(textMessage);
//鍵值對(duì)消息
MapMessage mapMessage = session.createMapMessage();
mapMessage.setLong("age", new Long(32));
mapMessage.setDouble("sarray", new Double(5867.15));
mapMessage.setString("username", "鍵值對(duì)消息");
producer.send(mapMessage);
//流消息
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("streamMessage流消息");
streamMessage.writeLong(55);
producer.send(streamMessage);
//字節(jié)消息
String s = "BytesMessage字節(jié)消息";
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(s.getBytes());
producer.send(bytesMessage);
//對(duì)象消息
User user = new User("cjm", "對(duì)象消息"); //User對(duì)象必須實(shí)現(xiàn)Serializable接口
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(user);
producer.send(objectMessage);
session.commit(); //在事務(wù)性會(huì)話中,只有commit之后,消息才會(huì)真正到達(dá)目的地
producer.close();
session.close();
conn.close();
2、消息接收:通過消息監(jiān)聽器的方式接收消息
public class Receiver implements MessageListener{
private boolean stop = false;
public void execute() throws Exception {
//連接工廠
ConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
//連接到JMS提供者
Connection conn = connFactory.createConnection();
conn.start();
//事務(wù)性會(huì)話,自動(dòng)確認(rèn)消息
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//消息的來源地
Destination destination = session.createQueue("queue.hello");
//消息消費(fèi)者
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
//等待接收消息
while(!stop){
Thread.sleep(5000);
}
session.commit();
consumer.close();
session.close();
conn.close();
}
public void onMessage(Message m) {
try{
if(m instanceof TextMessage){ //接收文本消息
TextMessage message = (TextMessage)m;
System.out.println(message.getText());
}else if(m instanceof MapMessage){ //接收鍵值對(duì)消息
MapMessage message = (MapMessage)m;
System.out.println(message.getLong("age"));
System.out.println(message.getDouble("sarray"));
System.out.println(message.getString("username"));
}else if(m instanceof StreamMessage){ //接收流消息
StreamMessage message = (StreamMessage)m;
System.out.println(message.readString());
System.out.println(message.readLong());
}else if(m instanceof BytesMessage){ //接收字節(jié)消息
byte[] b = new byte[1024];
int len = -1;
BytesMessage message = (BytesMessage)m;
while((len=message.readBytes(b))!=-1){
System.out.println(new String(b, 0, len));
}
}else if(m instanceof ObjectMessage){ //接收對(duì)象消息
ObjectMessage message = (ObjectMessage)m;
User user = (User)message.getObject();
System.out.println(user.getUsername() + " _ " + user.getPassword());
}else{
System.out.println(m);
}
stop = true;
}catch(JMSException e){
stop = true;
e.printStackTrace();
}
}
}
http://blog.csdn.net/caihaijiang/article/details/5903296