JMS入門(mén)介紹
JMS(Java Message Service,Java消息服務(wù))是一組Java應(yīng)用程序接口(Java API),它提供創(chuàng)建、發(fā)送、接收、讀取消息的服務(wù)。由Sun公司和它的合作伙伴設(shè)計(jì)的JMS API定義了一組公共的應(yīng)用程序接口和相應(yīng)語(yǔ)法,使得Java程序能夠和其他消息組件進(jìn)行通信。 JMS是一種與廠(chǎng)商無(wú)關(guān)的 API,用來(lái)訪(fǎng)問(wèn)消息收發(fā)系統(tǒng)。它類(lèi)似于 JDBC (Java Database Connectivity):這里,JDBC 是可以用來(lái)訪(fǎng)問(wèn)許多不同關(guān)系數(shù)據(jù)庫(kù)的 API,而 JMS 則提供同樣與廠(chǎng)商無(wú)關(guān)的訪(fǎng)問(wèn)方法,以訪(fǎng)問(wèn)消息收發(fā)服務(wù)。許多廠(chǎng)商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個(gè)例子。
JMS 使您能夠通過(guò)消息收發(fā)服務(wù)(有時(shí)稱(chēng)為消息中介程序或路由器)從一個(gè) JMS 客戶(hù)機(jī)向另一個(gè) JML 客戶(hù)機(jī)發(fā)送消息。消息是 JMS 中的一種類(lèi)型對(duì)象,由兩部分組成:報(bào)頭和消息主體。報(bào)頭由路由信息以及有關(guān)該消息的元數(shù)據(jù)組成。消息主體則攜帶著應(yīng)用程序的數(shù)據(jù)或有效負(fù)載。根據(jù)有效負(fù)載的類(lèi)型來(lái)劃分,可以將消息分為幾種類(lèi)型,它們分別攜帶:簡(jiǎn)單文本 (TextMessage)、可序列化的對(duì)象 (ObjectMessage)、屬性集合 (MapMessage)、字節(jié)流 (BytesMessage)、原始值流 (StreamMessage),還有無(wú)有效負(fù)載的消息 (Message)。
消息收發(fā)系統(tǒng)是異步的,也就是說(shuō),JMS 客戶(hù)機(jī)可以發(fā)送消息而不必等待回應(yīng)。比較可知,這完全不同于基于 RPC 的(基于遠(yuǎn)程過(guò)程的)系統(tǒng),如 EJB 1.1、CORBA 和 Java RMI 的引用實(shí)現(xiàn)。在 RPC 中,客戶(hù)機(jī)調(diào)用服務(wù)器上某個(gè)分布式對(duì)象的一個(gè)方法。在方法調(diào)用返回之前,該客戶(hù)機(jī)被阻塞;該客戶(hù)機(jī)在可以執(zhí)行下一條指令之前,必須等待方法調(diào)用結(jié)束。在 JMS 中,客戶(hù)機(jī)將消息發(fā)送給一個(gè)虛擬通道(主題或隊(duì)列),而其它 JMS 客戶(hù)機(jī)則預(yù)訂或監(jiān)聽(tīng)這個(gè)虛擬通道。當(dāng) JMS 客戶(hù)機(jī)發(fā)送消息時(shí),它并不等待回應(yīng)。它執(zhí)行發(fā)送操作,然后繼續(xù)執(zhí)行下一條指令。消息可能最終轉(zhuǎn)發(fā)到一個(gè)或許多個(gè)客戶(hù)機(jī),這些客戶(hù)機(jī)都不需要作出回應(yīng)。
JMS的通用接口集合以異步方式發(fā)送或接收消息。異步方式接收消息顯然是使用間斷網(wǎng)絡(luò)連接的客戶(hù)機(jī),諸如移動(dòng)電話(huà)和PDA的最好的選擇。另外, JMS采用一種寬松結(jié)合方式整合企業(yè)系統(tǒng)的方法,其主要的目的就是創(chuàng)建能夠使用跨平臺(tái)數(shù)據(jù)信息的、可移植的企業(yè)級(jí)應(yīng)用程序,而把開(kāi)發(fā)人力解放出來(lái)。
Java消息服務(wù)支持兩種消息模型:Point-to-Point消息(P2P)和發(fā)布訂閱消息(Publish Subscribe messaging,簡(jiǎn)稱(chēng)Pub/Sub)。JMS規(guī)范并不要求供應(yīng)商同時(shí)支持這兩種消息模型,但開(kāi)發(fā)者應(yīng)該熟悉這兩種消息模型的優(yōu)勢(shì)與缺點(diǎn)。
P2P消息模型是在點(diǎn)對(duì)點(diǎn)之間傳遞消息時(shí)使用。如果應(yīng)用程序開(kāi)發(fā)者希望每一條消息都能夠被處理,那么應(yīng)該使用P2P消息模型。與Pub/Sub消息模型不同,P2P消息總是能夠被傳送到指定的位置。
Pub/Sub模型在一到多的消息廣播時(shí)使用。如果一定程度的消息傳遞的不可靠性可以被接受的話(huà),那么應(yīng)用程序開(kāi)發(fā)者也可以使用Pub/Sub消息模型。換句話(huà)說(shuō),它適用于所有的消息消費(fèi)程序并不要求能夠收到所有的信息或者消息消費(fèi)程序并不想接收到任何消息的情況。
JMS通過(guò)允許創(chuàng)建持久訂閱來(lái)簡(jiǎn)化時(shí)間相關(guān)性,即使消息預(yù)訂者未激活也可以接收到消息。此外,使用持久訂閱還可通過(guò)隊(duì)列提供靈活性和可靠性,而仍然允許消息被發(fā)給許多的接收者。 Topic Subscriber topic Subscriber = topicSession.createDurableSubscriber(topic, subscriptionName); Connection對(duì)象表示了到兩種消息模型中的任一種的消息系統(tǒng)的連接。服務(wù)器端和客戶(hù)機(jī)端對(duì)象要求管理創(chuàng)建的JMS連接的狀態(tài)。連接是由Connection Factory創(chuàng)建的并且通過(guò)JNDI查尋定位。 //取得用于 P2P的 QueueConnectionFactory QueueConnectionFactory = queueConnectionFactory( ); Context messaging = new InitialContext( ); QueueConnectionFactory = (QueueConnectionFactory) Messaging.lookup(“QueueConnectionFactory”); //取得用于 pub/sub的 TopicConnectionFactory TopicConnectonFactory topicConnectionFactory; Context messaging = new InitialContext(); topicConnectionFactory = (TopicConnectionFactory) messaging.lookup(“TopicConnectionFactory”); 注意:用于P2P的代碼和用于PublishSubscribe的代碼非常相似。
如果session被標(biāo)記為transactional的話(huà),確認(rèn)消息就通過(guò)確認(rèn)和校正來(lái)自動(dòng)地處理。如果session沒(méi)有標(biāo)記為 transactional,你有三個(gè)用于消息確認(rèn)的選項(xiàng)。
· AUTO_ACKNOWLEDGE session將自動(dòng)地確認(rèn)收到一則消息。
· CLIENT_ACKNOWLEDGE 客戶(hù)端程序?qū)⒋_認(rèn)收到一則消息,調(diào)用這則消息的確認(rèn)方法。 · DUPS_OK_ACKNOWLEDGE 這個(gè)選項(xiàng)命令session“懶散的”確認(rèn)消息傳遞,可以想到,這將導(dǎo)致消息提供者傳遞的一些復(fù)制消息可能會(huì)出錯(cuò)。這種確認(rèn)的方式只應(yīng)當(dāng)用于消息消費(fèi)程序可以容忍潛在的副本消息存在的情況。 queueSession = queueConnection.createQueueSession(false, session.AUTO_ACKNOWLEDGE);//P2P topicSession = topicConnection.createTopicSession(false, session.AUTO_ACKNOWLEDGE); //Pub-Sub
注意:在本例中,一個(gè)session目的從連結(jié)中創(chuàng)建,非值指出session是non-transactional的,并且 session將自動(dòng)地確認(rèn)收到一則消息。
JMS現(xiàn)在有兩種傳遞消息的方式。標(biāo)記為NON_PERSISTENT的消息最多投遞一次,而標(biāo)記為PERSISTENT的消息將使用暫存后再轉(zhuǎn)送的機(jī)理投遞。如果一個(gè)JMS服務(wù)離線(xiàn),那么持久性消息不會(huì)丟失但是得等到這個(gè)服務(wù)恢復(fù)聯(lián)機(jī)時(shí)才會(huì)被傳遞。所以默認(rèn)的消息傳遞方式是非持久性的。即使使用非持久性消息可能降低內(nèi)務(wù)和需要的存儲(chǔ)器,并且這種傳遞方式只有當(dāng)你不需要接收所有的消息時(shí)才使用。
雖然 JMS規(guī)范并不需要JMS供應(yīng)商實(shí)現(xiàn)消息的優(yōu)先級(jí)路線(xiàn),但是它需要遞送加快的消息優(yōu)先于普通級(jí)別的消息。JMS定義了從0到9的優(yōu)先級(jí)路線(xiàn)級(jí)別,0是最低的優(yōu)先級(jí)而9則是最高的。更特殊的是0到4是正常優(yōu)先級(jí)的變化幅度,而5到9是加快的優(yōu)先級(jí)的變化幅度。舉例來(lái)說(shuō): topicPublisher.publish (message, DeliveryMode.PERSISTENT, 8, 10000); //Pub-Sub 或 queueSender.send(message, DeliveryMode.PERSISTENT, 8, 10000);//P2P 這個(gè)代碼片斷,有兩種消息模型,映射遞送方式是持久的,優(yōu)先級(jí)為加快型,生存周期是10000 (以毫秒度量 )。如果生存周期設(shè)置為零,這則消息將永遠(yuǎn)不會(huì)過(guò)期。當(dāng)消息需要時(shí)間限制否則將使其無(wú)效時(shí),設(shè)置生存周期是有用的。
JMS定義了五種不同的消息正文格式,以及調(diào)用的消息類(lèi)型,允許你發(fā)送并接收以一些不同形式的數(shù)據(jù),提供現(xiàn)有消息格式的一些級(jí)別的兼容性。
· StreamMessage -- Java原始值的數(shù)據(jù)流
· MapMessage--一套名稱(chēng)-值對(duì)
· TextMessage--一個(gè)字符串對(duì)象
· ObjectMessage--一個(gè)序列化的 Java對(duì)象
· BytesMessage--一個(gè)未解釋字節(jié)的數(shù)據(jù)流
JMS應(yīng)用程序接口提供用于創(chuàng)建每種類(lèi)型消息和設(shè)置荷載的方法例如,為了在一個(gè)隊(duì)列創(chuàng)建并發(fā)送一個(gè)TextMessage實(shí)例,你可以使用下列語(yǔ)句: TextMessage message = queueSession.createTextMessage(); message.setText(textMsg); 以異步方式接收消息,需要?jiǎng)?chuàng)建一個(gè)消息監(jiān)聽(tīng)器然后注冊(cè)一個(gè)或多個(gè)使用MessageConsumer的JMS MessageListener接口。會(huì)話(huà)(主題或隊(duì)列)負(fù)責(zé)產(chǎn)生某些消息,這些消息被傳送到使用onMessage方法的監(jiān)聽(tīng)者那里。 import javax.jms.*; public class ExampleListener implements MessageListener { //把消息強(qiáng)制轉(zhuǎn)化為T(mén)extMessage格式 public void onMessage(Message message) { TextMessage textMsg = null; // 打開(kāi)并處理這段消息 } } 當(dāng)我們創(chuàng)建QueueReceiver和TopicSubscriber時(shí),我們傳遞消息選擇器字符串: //P2P QueueReceiver QueueReceiver receiver; receiver = session.createReceiver(queue, selector); //Pub-Sub TopicSubscriber TopicSubscriber subscriber; subscriber = session.createSubscriber(topic, selector); 為了啟動(dòng)消息的交付,不論是Pub/Sub還是P2P,都需要調(diào)用start方法。 TopicConnection.start( ); //pub-sub QueueConnection.start( ); //P2P TopicConnection.start ( );// pub-sub QueueConnection.start ( );// P2P
當(dāng)一條消息被捕捉時(shí),這條消息做為一條必須被強(qiáng)制轉(zhuǎn)化為適當(dāng)消息類(lèi)型的普通Message對(duì)象到達(dá)。這是一個(gè)被用來(lái)提取或打開(kāi)消息內(nèi)容的getter方法。下列代碼片段使用StreamMessage類(lèi)型。 private void unPackMessage (Message message) { String eName; String position; double rate; StreamMessage message; Message = session.createStreamMessage( ); //注意下面的代碼必須按照我給出的順序書(shū)寫(xiě) message.writeString(eName); message.writeString(position); message.writeDouble(rate); //實(shí)現(xiàn)處理消息的必要的程序邏輯 }
停止消息的傳遞,無(wú)論是Pub/Sub還是P2P,都調(diào)用stop方法。 TopicConnection.start( ); //pub-sub QueueConnection.start( ); //P2P TopicConnection.start ( );// pub-sub QueueConnection.start ( );// P2P 其他的J2EE組件--servlet或EJB--可以當(dāng)作消息生產(chǎn)者;然而,它們可能只能同步操作,這可能是因?yàn)樗鼈兊恼?qǐng)求-應(yīng)答的性質(zhì)決定的。雖然XML目前還不是被支持的消息類(lèi)型,發(fā)送一個(gè)XML文件和創(chuàng)建一條文本類(lèi)型消息以及把XML文件添加到消息的有效負(fù)載都一樣簡(jiǎn)單,都是以非專(zhuān)有的方式傳送數(shù)據(jù)。值得注意的是,一些JMS供應(yīng)廠(chǎng)商已經(jīng)提供了可用的XML消息類(lèi)型。但是使用非標(biāo)準(zhǔn)的消息類(lèi)型可能會(huì)出現(xiàn)可移植性問(wèn)題。 String reportData; //reportData內(nèi)容為XML 文檔 TextMessage message; message = session.createTextMessage(); message.setText (reportData);
消息驅(qū)動(dòng)組件(MDB)是一個(gè)當(dāng)消息到達(dá)時(shí)被容器調(diào)用的異步消息消費(fèi)程序。和entity和session EJB不同,MDB沒(méi)有本地和遠(yuǎn)程接口并且是匿名的;它們對(duì)于客戶(hù)是不可見(jiàn)的。MDB是JMS系統(tǒng)的一部分,作為消費(fèi)者實(shí)現(xiàn)服務(wù)器上的商業(yè)邏輯程序。 一個(gè)客戶(hù)程序可能通過(guò)使用JNDI定位一個(gè)與MDB相關(guān)聯(lián)的JMS。 例如: Context initialContext = new InitialContext(); Queue reportInfoQueue = (javax.jms.Queue)initialContext.lookup (“java:comp/env/jms/reportInfoQueue”); MDB是由Bean類(lèi)和相應(yīng)的XML部署描述符組成。 Bean 類(lèi)實(shí)現(xiàn)MessageDriveBean 接口: import javax.ejb.*; import jms.Message.*; public interface MessageDriveBean { public void ejbCreate(); public void ejbRemove(); public void setMessageDrivenContext(MessageDrivenContext ctx); } 消息監(jiān)聽(tīng)器接口: import javax.jms.*; public interface MessageListener { public void onMessage( ); }
部署描述符 <!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/j2ee/dtds/ejb-jar_2_0.dtd"> <ejb-jar> <enterprise-beans> <message-driven> <ejb-name>MDB</ejb-name> <ejb-class>MDB</ejb-class> <transaction-type>Container</transaction-type> <message-driven-destination> <jms-destination-type>javax.jms.Queue</jms-destination-type> </message-driven-destination> <security-identity> <run-as-specified-identity> <role-name>everyone</role-name> </run-as-specified-identity> </security-identity> </message-driven> </enterprise-beans> </ejb-jar>
既然我們現(xiàn)在已經(jīng)有了一些基本的JMS知識(shí),那么我們可以使用JMS做什么呢?任何事情都可以。
例如,分別用于銷(xiāo)售、庫(kù)存、客戶(hù)服務(wù)和賬目處理的系統(tǒng)。這些部門(mén)之間的系統(tǒng)很可能已經(jīng)存在了很長(zhǎng)時(shí)間,這些處理要求把事務(wù)移動(dòng)到系統(tǒng)中去,這并不是一個(gè)小的工作。這就是消息服務(wù)適用的地點(diǎn)。
當(dāng)售貨員完成銷(xiāo)售的時(shí)候,一條消息被發(fā)給庫(kù)存系統(tǒng);一旦訂單消息發(fā)送給收發(fā)貨人員,就可以按照訂單出貨了。當(dāng)訂單成功地發(fā)貨,系統(tǒng)將通知顧客服務(wù)和會(huì)計(jì)系統(tǒng)這個(gè)訂單已經(jīng)成功的交易了。所有對(duì)應(yīng)的每個(gè)子系統(tǒng)都自動(dòng)地根據(jù)收到的消息進(jìn)行更新。
JMS一般都不是用來(lái)整合一個(gè)系統(tǒng),而是整合許多可能參與消息驅(qū)動(dòng)環(huán)境的系統(tǒng)。JMS是一個(gè)用于開(kāi)發(fā)和集成企業(yè)應(yīng)用程序的重要的工具。因?yàn)樵S多公司都有以前遺留下來(lái)的系統(tǒng)和新近開(kāi)發(fā)的系統(tǒng)綜合起來(lái)的系統(tǒng),消息的使用是整合整個(gè)企業(yè)的重要的步驟。
JMS接口描述
JMS 支持兩種消息類(lèi)型PTP 和Pub/Sub,分別稱(chēng)作:PTP Domain 和Pub/Sub Domain,這兩種接口都繼承統(tǒng)一的JMS Parent 接口,JMS 主要接口如下所示:
JMS Parent | PTPDomain | Pub/Sub Domain |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver,QueueBrowser | TopicSubscriber |
以下是對(duì)這些接口的簡(jiǎn)單描述:
ConnectionFactory :連接工廠(chǎng),JMS 用它創(chuàng)建連接
Connection :JMS 客戶(hù)端到JMS Provider 的連接
Destination :消息的目的地
Session: 一個(gè)發(fā)送或接收消息的線(xiàn)程
MessageProducer: 由Session 對(duì)象創(chuàng)建的用來(lái)發(fā)送消息的對(duì)象
MessageConsumer: 由Session 對(duì)象創(chuàng)建的用來(lái)接收消息的對(duì)象
JMS消息模型
JMS 消息由以下幾部分組成:消息頭,屬性,消息體。
消息頭(Header) - 消息頭包含消息的識(shí)別信息和路由信息,消息頭包含一些標(biāo)準(zhǔn)的屬性如:JMSDestination,JMSMessageID 等。
消息頭
由誰(shuí)設(shè)置
JMSDestination
send 或 publish 方法
JMSDeliveryMode
send 或 publish 方法
JMSExpiration
send 或 publish 方法
JMSPriority
send 或 publish 方法
JMSMessageID
send 或 publish 方法
JMSTimestamp
send 或 publish 方法
JMSCorrelationID
客戶(hù)
JMSReplyTo
客戶(hù)
JMSType
客戶(hù)
JMSRedelivered
JMS Provider
屬性(Properties) - 除了消息頭中定義好的標(biāo)準(zhǔn)屬性外,JMS 提供一種機(jī)制增加新屬性到消息頭中,這種新屬性包含以下幾種:
1. 應(yīng)用需要用到的屬性;
2. 消息頭中原有的一些可選屬性;
3. JMS Provider 需要用到的屬性。
標(biāo)準(zhǔn)的JMS 消息頭包含以下屬性:
JMSDestination --消息發(fā)送的目的地
JMSDeliveryMode --傳遞模式, 有兩種模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表示該消息一定要被送到目的地,否則會(huì)導(dǎo)致應(yīng)用錯(cuò)誤。NON_PERSISTENT 表示偶然丟失該消息是被允許的,這兩種模式使開(kāi)發(fā)者可以在消息傳遞的可靠性和吞吐量之間找到平衡點(diǎn)。
JMSMessageID 唯一識(shí)別每個(gè)消息的標(biāo)識(shí),由JMS Provider 產(chǎn)生。
JMSTimestamp 一個(gè)消息被提交給JMS Provider 到消息被發(fā)出的時(shí)間。
JMSCorrelationID 用來(lái)連接到另外一個(gè)消息,典型的應(yīng)用是在回復(fù)消息中連接到原消息。
JMSReplyTo 提供本消息回復(fù)消息的目的地址。
JMSRedelivered 如果一個(gè)客戶(hù)端收到一個(gè)設(shè)置了JMSRedelivered 屬性的消息,則表示可能該客戶(hù)端曾經(jīng)在早些時(shí)候收到過(guò)該消息,但并沒(méi)有簽收(acknowledged)。
JMSType 消息類(lèi)型的識(shí)別符。
JMSExpiration 消息過(guò)期時(shí)間,等于QueueSender 的send 方法中的timeToLive 值或TopicPublisher 的publish 方法中的timeToLive 值加上發(fā)送時(shí)刻的GMT 時(shí)間值。如果timeToLive值等于零,則JMSExpiration 被設(shè)為零,表示該消息永不過(guò)期。如果發(fā)送后,在消息過(guò)期時(shí)間之后消息還沒(méi)有被發(fā)送到目的地,則該消息被清除。
JMSPriority 消息優(yōu)先級(jí),從0-9 十個(gè)級(jí)別,0-4 是普通消息,5-9 是加急消息。JMS 不要求JMS Provider 嚴(yán)格按照這十個(gè)優(yōu)先級(jí)發(fā)送消息,但必須保證加急消息要先于普通消息到達(dá)。
消息體(Body) - JMS API 定義了5種消息體格式,也叫消息類(lèi)型,你可以使用不同形式發(fā)送接收數(shù)據(jù)并可以兼容現(xiàn)有的消息格式,下面描述這5種類(lèi)型:
消息類(lèi)型 | 消息體 |
TextMessage | java.lang.String對(duì)象,如xml文件內(nèi)容 |
MapMessage | 名/值對(duì)的集合,名是String對(duì)象,值類(lèi)型可以是Java任何基本類(lèi)型 |
BytesMessage | 字節(jié)流 |
StreamMessage | Java中的輸入輸出流 |
ObjectMessage | Java中的可序列化對(duì)象 |
Message | 沒(méi)有消息體,只有消息頭和屬性。 |
TextMessage message = queueSession.createTextMessage(); message.setText(msg_text); // msg_text is a String queueSender.send(message);
Message m = queueReceiver.receive(); if (m instanceof TextMessage) { TextMessage message = (TextMessage) m; System.out.println("Reading message: " + message.getText()); } else { // Handle error }
posted on 2007-08-08 10:12 朱巖 閱讀(4636) 評(píng)論(1) 編輯 收藏 所屬分類(lèi): JMS文章