軟件藝術思考者 |
|
|||
混沌,彷徨,立志,蓄勢... |
公告
日歷
導航隨筆分類(86)
隨筆檔案(85)
搜索最新評論
閱讀排行榜評論排行榜 |
首
先可以到網站上下載最新的openjms版本,然后啟動bin下的startup.bat就啟動服務了,然后可以啟動admin.bat進入管理界面(非
必要步驟),然后分別運行samples下的basic例子就可以看到效果了,當然以前有篇文章介紹的不錯,可以參考一下,不過時間上比較老了,做為參考
還是可以的。
本文介紹開源的JMS服務器openJms,及怎樣使用openJms來構建系統之間健全、高度可用的通訊,從而簡化企業級應用的開發。 openJms符合SUN的JMS API 1.0.2規范,支持消息隊列,還支持消息傳遞的發布/訂閱模式,本文先就系統服務的搭建及JMS的非結構化消息發送和接收進行說明。 JMS 有五種消息類型。三種結構化或半結構化的消息類型(MapMessage、ObjectMessage 和 StreamMessage)以及兩種非結構化的或自由格式的消息類型(TextMessage 和 BytesMessage)。而這里雖然我們只對非結構化消息進行說明,但非結構化的消息格式卻能夠更好地進行交互操作,因為它們在消息上很少利用結構, 在此基礎上與XML再進行結合,將能方便的進行更好的擴展,XML相關簡化操作參考《Jaxb來實現Java-XML的轉換》。 下面具體來介紹服務器搭建,在http://openjms.sourceforge.net/downloads.html下載openJms,解壓后 可以直接使用,在 "openjms-0.7.6.1"bin 里,有openJms的運行腳本,執行 startup 啟動,彈出一個新的窗口,服務就運行在新窗口內,shutdown 為停止命令: 服務運行后,就可以開始使用JMS服務了,至此服務搭建完畢,簡單得不能再簡單了。 下面是消息發送和接收的開發,開發中需要的jar包在"openjms-0.7.6.1"lib里可以找到: openjms-0.7.6.1.jar jms-1.0.2a.jar exolabcore-0.3.7.jar commons-logging-1.0.3.jar 把上面的類包加入到項目中,下面是消息發送服務的代碼: package javayou.demo.openjms; import java.util.*; import javax.jms.*; import javax.naming.*; public class QueueSend { public static void main(String[] args) { try { //取得JNDI上下文和連接 Hashtable properties = new Hashtable(); properties.put( Context.INITIAL_CONTEXT_FACTORY, "org.exolab.jms.jndi.InitialContextFactory"); //openJms默認的端口是1099 properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/"); Context context = new InitialContext(properties); //獲得JMS信息連接隊列工廠 QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) context.lookup( "JmsQueueConnectionFactory"); //獲得JMS信息連接隊列 QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); //產生隊列Session,設置事務為false,自動應答消息接收 QueueSession queueSession = queueConnection.createQueueSession( false, Session.AUTO_ACKNOWLEDGE); //獲得默認內建在JMS里的隊列之一:queue1 Queue queue = (Queue) context.lookup("queue1"); //產生JMS隊列發送器 QueueSender queueSender = queueSession.createSender(queue); //發送數據到JMS TextMessage message = queueSession.createTextMessage(); message.setText("Hello, I'm openJms."); queueSender.send(message); System.out.println( ""信息寫入JMS服務器隊列"); //以下做清除工作,代碼略 // ... ... } catch (Exception e) { e.printStackTrace(); } } } 執行程序發送消息,然后打開JMS控制臺,用 admin 命令啟動管理平臺,點擊菜單Actions-Connections-online,出現界面如下: 可以看到JSM默認的隊列queue1里已經有1條消息了,而其他的隊列還是空著的。 下面我們來看看消息接收服務的代碼: package javayou.demo.openjms; import java.util.*; import javax.jms.*; import javax.naming.*; public class QueueReceiveSynchronous { public static void main(String[] args) { try { //取得JNDI上下文和連接 Hashtable properties = new Hashtable(); properties.put( Context.INITIAL_CONTEXT_FACTORY, "org.exolab.jms.jndi.InitialContextFactory"); properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/"); Context context = new InitialContext(properties); //獲得JMS信息連接隊列工廠 QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) context.lookup( "JmsQueueConnectionFactory"); //獲得JMS信息連接隊列 QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); //啟動接收隊列線程 queueConnection.start(); //產生隊列Session,設置事務為false,自動應答消息接收 QueueSession queueSession = queueConnection.createQueueSession( false, Session.AUTO_ACKNOWLEDGE); //獲得默認內建在JMS里的隊列之一:queue1 Queue queue = (Queue) context.lookup("queue1"); //產生JMS隊列接收器 QueueReceiver queueReceiver = queueSession.createReceiver(queue); //通過同步的方法接收消息 Message message = queueReceiver.receive(); String messageText = null; if (message instanceof TextMessage) messageText = ((TextMessage) message). getText(); System.out.println(messageText); //以下做清除工作,代碼略 // ... ... } catch (Exception e) { e.printStackTrace(); } } } 編譯后運行接收信息服務,可以看到接收到并打印之前發送的消息,再看看控制臺,發現queue1的消息隊列變為0,消息已被讀取,消息發送和接收到此結束。 上篇openJms介紹 (一) 提到了openJms的構建及消息的發送和接收,這篇主要了解消息的發布和訂閱。JMS 的發布/訂閱模型定義了如何向一個內容節點發布和訂閱消息,內容節點也叫主題(topic),主題是為發布者(publisher)和訂閱者 (subscribe) 提供傳輸的中介。發布/訂閱模型使發布者和訂閱者之間不需要直接通訊(如RMI)就可保證消息的傳送,有效解決系統間耦合問題(當然有這個需要才行),還 有就是提供了一對一、一對多的通訊方式,比較靈活。 先介紹JMS里2個概念,持久訂閱模式和非持久訂閱模式,其實也是發布/訂閱模型在可靠性上提供的2種方式: 非持久訂閱模式:只有當客戶端處于激活狀態,也就是和JMS 服務器保持連接的狀態下,才能接收到發送到某個Topic的消息,而當客戶端處于離線狀態時,則這個時間段發到Topic的消息將會永遠接收不到。 持久訂閱模式:客戶端向JMS 注冊一個識別自己身份的ID,當這個客戶端處于離線時,JMS 服務器會為這個ID 保存所有發送到主題的消息,當客戶再次連接到JMS 服務器時,會根據自己的ID 得到所有當自己處于離線時發送到主題的消息,即消息永遠能接收到。 下面我們就接著來看openJms在發布/訂閱模式上的表現,由于篇幅關系,在這里只講述非持久訂閱模式,持久訂閱模式可以根據JMS的標準來試。 消息發布的代碼如下: package javayou.demo.openjms; import java.util.*; import javax.jms.*; import javax.naming.*; public class TopicPublish { public static void main(String[] args) { try { //取得JNDI上下文和連接 Hashtable properties = new Hashtable(); properties.put( Context.INITIAL_CONTEXT_FACTORY, "org.exolab.jms.jndi.InitialContextFactory"); //openJms默認的端口是1099 properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/"); Context context = new InitialContext(properties); //獲得JMS Topic連接隊列工廠 TopicConnectionFactory factory = (TopicConnectionFactory) context.lookup( "JmsTopicConnectionFactory"); //創建一個Topic連接,并啟動 TopicConnection topicConnection = factory.createTopicConnection(); topicConnection.start(); //創建一個Topic會話,并設置自動應答 TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); //lookup 得到 topic1 Topic topic = (Topic) context.lookup("topic1"); //用Topic會話生成Topic發布器 TopicPublisher topicPublisher = topicSession.createPublisher(topic); //發布消息到Topic System.out.println("消息發布到Topic"); TextMessage message = topicSession.createTextMessage ("你好,歡迎定購Topic類消息"); topicPublisher.publish(message); //資源清除,代碼略 ... ... } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } } 而訂閱消息的接收有同步的和異步2種,他們分別使用receive()和onMessage(Message message)方法來接收消息,具體代碼: 同步接收: package javayou.demo.openjms; import java.util.*; import javax.jms.*; import javax.naming.*; public class TopicSubscribeSynchronous { public static void main(String[] args) { try { System.out.println("定購消息接收啟動:"); //取得JNDI上下文和連接 Hashtable properties = new Hashtable(); properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.exolab.jms.jndi.InitialContextFactory"); properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/"); Context context = new InitialContext(properties); //獲得Topic工廠和Connection TopicConnectionFactory factory = (TopicConnectionFactory) context.lookup( "JmsTopicConnectionFactory"); TopicConnection topicConnection = factory.createTopicConnection(); topicConnection.start(); //創建Topic的會話,用于接收信息 TopicSession topicSession = topicConnection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); //lookup topic1 Topic topic = (Topic) context.lookup("topic1"); //創建Topic subscriber TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); //收滿10條訂閱消息則退出 for (int i=0; i<10; i++) { //同步消息接收,使用receive方法,堵塞等待,直到接收消息 TextMessage message = (TextMessage) topicSubscriber.receive(); System.out.println("接收訂閱消息["+i+"]: " + message.getText()); } //資源清除,代碼略 ... ... System.out.println("訂閱接收結束."); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } } 非同步接收: package javayou.demo.openjms; import java.util.*; import javax.jms.*; import javax.naming.*; public class TopicSubscribeAsynchronous implements MessageListener { private TopicConnection topicConnection; private TopicSession topicSession; private Topic topic; private TopicSubscriber topicSubscriber; TopicSubscribeAsynchronous() { try { //取得JNDI上下文和連接 Hashtable properties = new Hashtable(); properties.put( Context.INITIAL_CONTEXT_FACTORY, "org.exolab.jms.jndi.InitialContextFactory"); properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/"); Context context = new InitialContext(properties); //取得Topic的連接工廠和連接 TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) context.lookup( "JmsTopicConnectionFactory"); topicConnection = topicConnectionFactory.createTopicConnection(); //創建Topic的會話,用于接收信息 topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topic = (Topic) context.lookup("topic1"); //創建Topic subscriber topicSubscriber = topicSession.createSubscriber(topic); //設置訂閱監聽 topicSubscriber.setMessageListener(this); //啟動信息接收 topicConnection.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { System.out.println("非同步定購消息的接收:"); try { TopicSubscribeAsynchronous listener = new TopicSubscribeAsynchronous(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } //收到訂閱信息后自動調用此方法 public void onMessage(Message message) { try { String messageText = null; if (message instanceof TextMessage) messageText = ((TextMessage) message).getText(); System.out.println(messageText); } catch (JMSException e) { e.printStackTrace(); } } } 編譯好后,啟動openJms服務,打開admin管理臺,為了運行方便,這里先列出三個類的運行命令: java -cp ."; -Djava.ext.dirs=."lib; javayou.demo.openjms.TopicPublish java -cp ."; -Djava.ext.dirs=."lib; javayou.demo.openjms.TopicSubscribeSynchronous java -cp ."; -Djava.ext.dirs=."lib; javayou.demo.openjms.TopicSubscribeAsynchronous 先運行2個接收命令,再運行發布命令,可以看到控制臺的Topic有消息接收,并且接收1和2都有消息接收的提示,到此完成演示,由于是非持久訂閱,所以可以看到控制臺上的Topic消息條數不會減少。
評論:
|
![]() |
|
Copyright © 智者無疆 | Powered by: 博客園 模板提供:滬江博客 |
觀音菩薩贊