Terry.Li-彬

          虛其心,可解天下之問;專其心,可治天下之學;靜其心,可悟天下之理;恒其心,可成天下之業。

            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            143 隨筆 :: 344 文章 :: 130 評論 :: 0 Trackbacks

          1 DestinationResolver
          ??? DestinationResolver接口的作用是將指定的目的地名解析為目的地實例。其定義如下:

          Java代碼
          1. public ? interface ?DestinationResolver?{??
          2. ????Destination?resolveDestinationName(Session?session,?String?destinationName,???
          3. ????????boolean?pubSubDomain)?throws?JMSException;??
          4. }??

          ??? 參數pubSubDomain用于指定是使用“發布/訂閱”模式(解析后的目的地是Topic),還是使用“點對點”模式(解析后的目的地是Queue)。

          ?

          ??? CachingDestinationResolver接口繼承了DestinationResolver,增加了緩存的功能,其接口定義如下:

          Java代碼
          1. public ? interface ?CachingDestinationResolver? extends ?DestinationResolver?{??
          2. ????void?removeFromCache(String?destinationName);??
          3. ????void?clearCache();??
          4. }??

          ??? 在目的地失效的時候,removeFromCache方法會被調用;在JMS provider失效的時候,clearCache方法會被調用。

          1.1 DynamicDestinationResolver
          ??? DynamicDestinationResolver實現了DestinationResolver接口。根據指定的目的地名,DynamicDestinationResolver會動態創建目的地實例。針對JMS1.1規范,它采用如下方法創建目的地:

          Java代碼
          1. session.createTopic(topicName)??
          2. session.createQueue(queueName);??


          1.2 JndiDestinationResolver
          ??? JndiDestinationResolver繼承自JndiLocatorSupport, 同時實現了CachingDestinationResolver接口。如果在JMS provider中配置了靜態目的地,那么JndiDestinationResolver通過JNDI查找的方式獲得目的地實例。


          ??? JndiDestinationResolver的fallbackToDynamicDestination屬性用于指定在JNDI查找失敗后,是否使 用動態目的地,默認值是false。JndiDestinationResolver的cache屬性用于指定是否對目的地實例進行緩存,默認值是 true。

          ?

          1.3 BeanFactoryDestinationResolver
          ??? BeanFactoryDestinationResolver實現了DestinationResolver接口和BeanFactoryAware接口。它會根據指定的目的地名從BeanFactory中查找目的地實例。以下是相關的代碼:

          Java代碼
          1. public ?Destination?resolveDestinationName(Session?session,?String?destinationName,???
          2. boolean ?pubSubDomain)? throws ?JMSException?{??
          3. ????Assert.state(this.beanFactory?!=?null,?"BeanFactory?is?required");??
          4. ????try?{??
          5. ????????return?(Destination)?this.beanFactory.getBean(destinationName,?Destination.class);??
          6. ????}??
          7. ????catch?(BeansException?ex)?{??
          8. ????????throw?new?DestinationResolutionException(??
          9. ????????????"Failed?to?look?up?Destinaton?bean?with?name?'"?+?destinationName?+?"'",?ex);??
          10. ????}??
          11. }??

          ?

          2 JmsAccessor
          ??? 抽象類JmsAccessor是JmsTemplate、SimpleMessageListenerContainer和 DefaultMessageListenerContainer等concrete class的基類。JmsAccessor定義了如下幾個用于訪問JMS服務的共通屬性。

          Java代碼
          1. private ?ConnectionFactory?connectionFactory;??
          2. private ? boolean ?sessionTransacted?=? false ;??
          3. private ? int ?sessionAcknowledgeMode?=?Session.AUTO_ACKNOWLEDGE;??


          ??? JmsAccessor提供了創建Connection和Session的方法,如下:

          Java代碼
          1. protected ?Connection?createConnection()? throws ?JMSException?{??
          2. ????return?getConnectionFactory().createConnection();??
          3. }??
          4. ??
          5. protected ?Session?createSession(Connection?con)? throws ?JMSException?{??
          6. ????return?con.createSession(isSessionTransacted(),?getSessionAcknowledgeMode());??
          7. }??

          ?
          2.1 JmsDestinationAccessor
          ??? 抽象類JmsDestinationAccessor繼承自JmsAccessor,增加了destinationResolver和 pubSubDomain屬性。destinationResolver的默認值是DynamicDestinationResolver的實例,也就是 說默認采用動態目的地解析的方式;pubSubDomain用于指定是使用“發布/訂閱”模式還是使用“點對點”模式,默認值是false。

          ??? JmsDestinationAccessor提供了用于解析目的地的方法,如下:

          Java代碼
          1. protected ?Destination?resolveDestinationName(Session?session,?String?destinationName)???
          2. throws ?JMSException?{??
          3. ????return?getDestinationResolver().resolveDestinationName(session,?destinationName,???
          4. ????????isPubSubDomain());??
          5. }??


          2.2 AbstractJmsListeningContainer
          ??? AbstractJmsListeningContainer繼承自JmsDestinationAccessor,作為所有Message Listener Container的公共基類。它主要提供了JMS connection的生命周期管理的功能,但是沒有對消息接收的方式(主動接收方式或者異步接收方式)等做任何假定。該類主要的屬性如下:

          Java代碼
          1. private ?String?clientId;??
          2. private ?Connection?sharedConnection;??

          ??? clientId通常用于持久訂閱;sharedConnection保存了被共享的JMS connection。

          ?

          ??? 該類定義了如下的抽象方法,以便子類可以決定是否使用共享的JMS connection。

          Java代碼
          1. protected ? abstract ? boolean ?sharedConnectionEnabled();??

          ?

          2.3 AbstractMessageListenerContainer
          ??? AbstractMessageListenerContainer繼承自AbstractJmsListeningContainer,也是作為所有Message Listener Container的公共基類。該類主要的屬性如下:

          Java代碼
          1. private ? volatile ?Object?destination;??
          2. private ? volatile ?Object?messageListener;??
          3. private ? boolean ?exposeListenerSession?=? true ;??

          ??? destination用于指定接收消息的目的地。
          ??? messageListener用于指定處理消息的listener。對于messageListener,它既可以是符合JMS規范的 javax.jms.MessageListener,也可以是Spring特有的 org.springframework.jms.listener.SessionAwareMessageListener。 SessionAwareMessageListener的定義如下:

          Java代碼
          1. public ? interface ?SessionAwareMessageListener?{??
          2. ????void?onMessage(Message?message,?Session?session)?throws?JMSException;??
          3. }??

          ??? 跟javax.jms.MessageListener相比,這個接口的onMessage方法增加了一個Session 類型的參數,可以通過這個session發送回復消息(reply message)。


          ??? 如果使用了SessionAwareMessageListener 類型的message listener,那么exposeListenerSession參數指定了傳入onMessage方法的session參數是否是創建了 MessageConsumer的session,默認值是true。如果是false,那么 AbstractMessageListenerContainer會在connection上新建一個session,并傳入onMessage方法。

          2.4 AbstractPollingMessageListenerContainer
          ??? AbstractPollingMessageListenerContainer繼承自AbstractMessageListenerContainer,它提供了對于主動接收消息(polling)的支持,以及支持外部的事務管理。

          Java代碼
          1. private ? boolean ?pubSubNoLocal?=? false ;??
          2. private ? long ?receiveTimeout?=?DEFAULT_RECEIVE_TIMEOUT;??
          3. private ?PlatformTransactionManager?transactionManager;??

          ??? 如果使用“發布/訂閱”模式,那么pubSubNoLocal 屬性指定通過某個連接發送到某個Topic的消息,是否應該被投遞回這個連接。


          ??? receiveTimeout屬性用于指定調用MessageConsumer的receive方法時的超時時間,默認值是1秒。需要注意的是,這個值應該比transactionManager 中指定的事務超時時間略小。


          ??? 通常情況下,應該為transactionManager設置一個 org.springframework.transaction.jta.JtaTransactionManager的實例,此外也要設置一個支持 XA的ConnectionFactory。需要注意的是,XA 事務對性能有較大的影響。
          ??? 如果只是希望使用local JMS transaction,那么只要設置sessionTransacted為true或者使用JmsTransactionManager即可。實際上, 如果設置了非JTA的transactionManager,那么sessionTransacted屬性會自動被設置成true。
          ??? 由于local JMS transaction無法同其它local transaction(例如local database transaction)進行協調,因此客戶端程序可能需要對重發的消息進行檢查。JMS規范要求:JMS provider應該將重發消息的JMSRedelivered屬性設置為true。

          2.5 SimpleMessageListenerContainer
          ??? SimpleMessageListenerContainer繼承自AbstractMessageListenerContainer,使用異步方式 接收消息(也就是通過MessageConsumer上注冊MessageListener的方式接收消息)。該類主要的屬性如下:

          Java代碼
          1. private ? boolean ?pubSubNoLocal?=? false ;??
          2. private ? int ?concurrentConsumers?=? 1 ;??
          3. private ?Set?sessions;??
          4. private ?Set?consumers;??
          5. private ?TaskExecutor?taskExecutor;??

          ??? 如果使用“發布/訂閱”模式,那么pubSubNoLocal 屬性指定通過某個連接發送到某個Topic的消息,是否應該被投遞回這個連接。

          ?

          ??? SimpleMessageListenerContainer允許創建多個Session和MessageConsumer來接收消息。具體的個數由 concurrentConsumers屬性指定。需要注意的是,應該只是在Destination為Queue的時候才使用多個 MessageConsumer(Queue中的一個消息只能被一個Consumer接收),雖然使用多個MessageConsumer會提高消息處理 的性能,但是消息處理的順序卻得不到保證:消息被接收的順序仍然是消息發送時的順序,但是由于消息可能會被并發處理,因此消息處理的順序可能和消息發送的 順序不同。此外,不應該在Destination為Topic的時候使用多個MessageConsumer,這是因為多個 MessageConsumer會接收到同樣的消息。

          ??? SimpleMessageListenerContainer創建的Session和MessageConsumer分別保存在sessions和consumers屬性中。


          ??? taskExecutor屬性的默認值是null,也就是說,對MessageListener(或者 SessionAwareMessageListener)的回調是在MessageConsumer的內部線程中執行。如果指定了 taskExecutor,那么回調是在TaskExecutor內部的線程中執行。以下是相關的代碼:

          Java代碼
          1. protected ?MessageConsumer?createListenerConsumer( final ?Session?session)???
          2. throws ?JMSException?{??
          3. ????Destination?destination?=?getDestination();??
          4. ????if?(destination?==?null)?{??
          5. ????????destination?=?resolveDestinationName(session,?getDestinationName());??
          6. ????}??
          7. ????MessageConsumer?consumer?=?createConsumer(session,?destination);??
          8. ??
          9. ????if?(this.taskExecutor?!=?null)?{??
          10. ????????consumer.setMessageListener(new?MessageListener()?{??
          11. ????????????public?void?onMessage(final?Message?message)?{??
          12. ????????????????taskExecutor.execute(new?Runnable()?{??
          13. ????????????????????public?void?run()?{??
          14. ????????????????????????processMessage(message,?session);??
          15. ????????????????????}??
          16. ????????????????});??
          17. ????????????}??
          18. ????????});??
          19. ????}??
          20. ????else?{??
          21. ????????consumer.setMessageListener(new?MessageListener()?{??
          22. ????????????public?void?onMessage(Message?message)?{??
          23. ????????????????processMessage(message,?session);??
          24. ????????????}??
          25. ????????});??
          26. ????}??
          27. ??
          28. ????return?consumer;??
          29. }??

          ??? 需要注意的是,如果指定了taskExecutor,那么消息在被taskExecutor內部的線程處理前,可能已經被確認過了(外層的 onMessage方法可能已經執行結束了)。因此如果使用事務Session或者Session.CLIENT_ACKNOWLEDGE類型的確認模 式,那么可能會導致問題。

          ?

          ??? 該類的sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定義)總是返回true, 因此SimpleMessageListenerContainer會使用共享的JMS connection。

          2.6 DefaultMessageListenerContainer
          ??? DefaultMessageListenerContainer繼承自 AbstractPollingMessageListenerContainer,主要使用同步的方式接收消息(也就是通過循環調用 MessageConsumer.receive的方式接收消息)。該類主要的屬性如下:

          Java代碼
          1. private ? int ?concurrentConsumers?=? 1 ;??
          2. private ? int ?maxConcurrentConsumers?=? 1 ;??
          3. private ? int ?maxMessagesPerTask?=?Integer.MIN_VALUE;??
          4. private ? int ?idleTaskExecutionLimit?=? 1 ;??
          5. private ? final ?Set?scheduledInvokers?=? new ?HashSet();??
          6. private ?TaskExecutor?taskExecutor;??
          7. private ? int ?cacheLevel?=?CACHE_AUTO;??

          ??? 跟SimpleMessageListenerContainer一樣,DefaultMessageListenerContainer也支持創建多個 Session和MessageConsumer來接收消息。跟SimpleMessageListenerContainer不同的 是,DefaultMessageListenerContainer創建了concurrentConsumers所指定個數的 AsyncMessageListenerInvoker(實現了SchedulingAwareRunnable接口),并交給 taskExecutor運行。


          ??? maxMessagesPerTask屬性的默認值是Integer.MIN_VALUE,但是如果設置的taskExecutor(默認值是 SimpleAsyncTaskExecutor)實現了SchedulingTaskExecutor接口并且其 prefersShortLivedTasks方法返回true(也就是說該TaskExecutor傾向于短期任務),那么 maxMessagesPerTask屬性會自動被設置為10。
          ??? 如果maxMessagesPerTask屬性的值小于0,那么AsyncMessageListenerInvoker.run方法會在循環中反復嘗試 接收消息,并在接收到消息后調用MessageListener(或者SessionAwareMessageListener);如果 maxMessagesPerTask屬性的值不小于0,那么AsyncMessageListenerInvoker.run方法里最多會嘗試接收消息 maxMessagesPerTask次,每次接收消息的超時時間由其父類 AbstractPollingMessageListenerContainer的receiveTimeout屬性指定。如果在這些嘗試中都沒有接收 到消息,那么AsyncMessageListenerInvoker的idleTaskExecutionCount屬性會被累加。在run方法執行完 畢前會對idleTaskExecutionCount進行檢查,如果該值超過了 DefaultMessageListenerContainer.idleTaskExecutionLimit(默認值1),那么這個 AsyncMessageListenerInvoker可能會被銷毀。


          ??? 所有AsyncMessageListenerInvoker實例都保存在scheduledInvokers中,實例的個數可以在 concurrentConsumers和maxConcurrentConsumers之間浮動。跟 SimpleMessageListenerContainer一樣,應該只是在Destination為Queue的時候才使用多個 AsyncMessageListenerInvoker實例。

          ?

          ??? cacheLevel屬性用于指定是否對JMS資源進行緩存,可選的值是CACHE_NONE = 0、CACHE_CONNECTION = 1、CACHE_SESSION = 2、CACHE_CONSUMER = 3和CACHE_AUTO = 4。默認情況下,如果transactionManager屬性不為null,那么cacheLevel被自動設置為CACHE_NONE(不進行緩 存),否則cacheLevel被自動設置為CACHE_CONSUMER。


          ??? 如果cacheLevel屬性值大于等于CACHE_CONNECTION,那么sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定義)返回true,也就是說使用共享的JMS連接。

          ?

          ?

          3 SingleConnectionFactory
          ??? SingleConnectionFactory實現了ConnectionFactory接口,其createConnection方法總是返回相同的 Connection。可以在SingleConnectionFactory的構造函數中傳入Connection對象或者 ConnectionFactory對象,用來創建被代理的連接對象。 SingleConnectionFactory.createConnection方法返回的連接是個代理,它忽略了對stop和close方法的調用 (連接會在SingleConnectionFactory.destroy方法中關閉)。


          ??? SingleConnectionFactory的reconnectOnException屬性用來指定是否在連接拋出JMSException的時候,對連接進行重置,重置后如果再調用createConnection方法,那么會返回一個新的連接。


          ??? 需要注意的是,AbstractJmsListeningContainer類的抽象方法sharedConnectionEnabled指定了是否在 message listener container內部使用共享的JMS連接。因此通常情況下不需要為單獨的message listener container設置SingleConnectionFactory(及其子類);如果希望在不同的message listener container之間共享JMS連接,那么可以考慮使用SingleConnectionFactory。

          3.1 CachingConnectionFactory
          ??? CachingConnectionFactory繼承自SingleConnectionFactory,增加了對Session和MessageProducer緩存的功能。該類主要的屬性如下:

          Java代碼
          1. private ? int ?sessionCacheSize?=? 1 ;??
          2. private ? boolean ?cacheProducers?=? true ;??

          ??? sessionCacheSize屬性指定了被緩存的Session實例的個數(默認值是1),也就是說,如果同時請求的Session個數大于sessionCacheSize,那么這些Session不會被緩存,而是正常的被創建和銷毀。


          ??? cacheProducers屬性指定了是否對MessageProducer進行緩存,默認值是true。

          posted on 2010-09-01 22:33 禮物 閱讀(1909) 評論(0)  編輯  收藏 所屬分類: springJMS
          主站蜘蛛池模板: 财经| 玉山县| 新营市| 宜都市| 太保市| 保靖县| 白沙| 新乡市| 墨脱县| 鸡泽县| 万州区| 前郭尔| 加查县| 永顺县| 凤山县| 嘉禾县| 平和县| 平潭县| 蚌埠市| 盐城市| 海门市| 石嘴山市| 札达县| 涡阳县| 栾城县| 鹤峰县| 准格尔旗| 吴忠市| 昌邑市| 会泽县| 隆化县| 塔河县| 徐闻县| 华池县| 彩票| 乡宁县| 朔州市| 繁峙县| 宝应县| 汾阳市| 侯马市|