隨筆 - 41  文章 - 7  trackbacks - 0
          <2016年8月>
          31123456
          78910111213
          14151617181920
          21222324252627
          28293031123
          45678910

          常用鏈接

          留言簿

          隨筆分類

          隨筆檔案

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          Consumer Tags

          從1.4.5版本開始,你可以提供一種策略來生成consumer tags.默認情況下,consumer tag是由broker來生成的.

          public interface ConsumerTagStrategy {      String createConsumerTag(String queue);  }
          該隊列是可用的,所以它可以(可選)在tag中使用。

          參考Section 3.1.15, “Message Listener Container Configuration”.

          注解驅動的監聽器Endpoints

          介紹

          從1.4版本開始,異步接收消息的最簡單方式是使用注解監聽器端點基礎設施. 簡而言之,它允許你暴露管理bean的方法來作為Rabbit 監聽器端點.

          @Component
          public class MyService {      
           @RabbitListener(queues = "myQueue")
           public void processOrder(String data) {         
               ...
            }
          }
          上面例子的含義是,當消息在org.springframework.amqp.core.Queue "myQueue"上可用時, 會調用processOrder方法(在這種情況下,帶有消息的負載).

          通過使用RabbitListenerContainerFactory,注解端點基礎設施在每個注解方法的幕后都創建了一個消息監聽器容器.在上面的例子中,myQueue 必須是事先存在的,
          并綁定了某個交換器上.從1.5.0版本開始
          ,只要在上下文中存在RabbitAdmin,隊列可自動聲明和綁定.

          @Component
          public class MyService {
          
            @RabbitListener(bindings = @QueueBinding(
                  value = @Queue(value = "myQueue", durable = "true"),
                  exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
                  key = "orderRoutingKey")
            )public void processOrder(String data) {
              ...
            }
          
            @RabbitListener(bindings = @QueueBinding(
                  value = @Queue,
                  exchange = @Exchange(value = "auto.exch"),
                  key = "invoiceRoutingKey")
            )public void processInvoice(String data) {
              ...
            }
          
          }

          在第一個例子中,隊列myQueue 會與交換器一起自動聲明(持久化的), 如果需要,可使用路由鍵來綁定到交換器上.在第二個例子中,匿名(專用的,自動刪除的)隊列將會聲明并綁定.
          可提供多個 QueueBinding 條目,允許監聽器監聽多個隊列.

          當前只支持DIRECT, FANOUT, TOPIC 和HEADERS的交換器類型.當需要高級配置時,可使用@Bean 定義.

          注意第一個例子中交換器上的 ignoreDeclarationExceptions .這允許,例如, 綁定到有不同的設置(如.internal)的交換器上. 默認情況下,現有交換器的屬性必須被匹配.

          從1.6版本開始,你可為隊列,交換器和綁定的@QueueBinding 注解中指定參數.示例:

          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue(value = "auto.headers", autoDelete = "true",
                                  arguments = @Argument(name = "x-message-ttl", value = "10000",
                                                          type = "java.lang.Integer")),
                  exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
                  arguments = {
                          @Argument(name = "x-match", value = "all"),
                          @Argument(name = "foo", value = "bar"),
                          @Argument(name = "baz")
                  })
          )
          public String handleWithHeadersExchange(String foo) { ... }

          注意隊列的x-message-ttl 參數設為了10秒鐘,因為參數類型不是String, 因此我們指定了它的類型,在這里是Integer.有了這些聲明后,如果隊列已經存在了,參數必須匹配現有隊列上的參數.對于header交換器,我們設置binding arguments 要匹配頭中foo為bar,且baz可為任意值的消息. x-match 參數則意味著必須同時滿足兩個條件.

          參數名稱,參數值,及類型可以是屬性占位符(${...}) 或SpEL 表達式(#{...}). name 必須要能解析為String; type的表達式必須能解析為Class 或類的全限定名. value 必須能由DefaultConversionService 類型進行轉換(如上面例子中x-message-ttl).

          如果name 解析為null 或空字符串,那么將忽略 @Argument.

          元注解(Meta-Annotations)

          有時,你想將同樣的配置用于多個監聽器上. 為減少重復配置,你可以使用元注解來創建你自己的監聽器注解:

          @Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
          @Retention(RetentionPolicy.RUNTIME)
          @RabbitListener(bindings = @QueueBinding(
                  value = @Queue,
                  exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
          public@interface MyAnonFanoutListener {
          }
          
          public class MetaListener {
          
              @MyAnonFanoutListener
          public void handle1(String foo) {
                  ...
              }
          
              @MyAnonFanoutListener
          public void handle2(String foo) {
                  ...
              }
          
          }

          在這個例子中,每個通過@MyAnonFanoutListener創建的監聽器都會綁定一個匿名,自動刪除的隊列到fanout交換器 metaFanout. 元注解機制是簡單的,在那些用戶定義注解中的屬性是不會經過檢查的- 因此你不能從元注解中覆蓋設置.當需要高級配置時,使用一般的 @Bean 定義.

          Enable Listener Endpoint Annotations

          為了啟用 @RabbitListener 注解,需要在你的某個@Configuration類中添加@EnableRabbit 注解.

          @Configuration
          @EnableRabbit
          publicclass AppConfig {
          
              @Bean
          public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
                  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
                  factory.setConnectionFactory(connectionFactory());
                  factory.setConcurrentConsumers(3);
                  factory.setMaxConcurrentConsumers(10);
                  return factory;
              }
          }

          默認情況下,基礎設施會查找一個名為rabbitListenerContainerFactory 的bean作為工廠來源來創建消息監聽器容器. 在這種情況下,會忽略RabbitMQ 基礎設施計劃, processOrder 方法可使用核心輪詢大小為3個線程最大10個線程的池大小來調用.

          可通過使用注解或實現RabbitListenerConfigurer

          接口來自定義監聽器容器工廠. 默認只需要注冊至少一個Endpoints,而不需要一個特定的容器工廠.查看javadoc來了解詳情和例子.

          如果你更喜歡XML配置,可使用 <rabbit:annotation-driven> 元素.

          <rabbit:annotation-driven/>
          <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
          <property name="connectionFactory" ref="connectionFactory"/>
          <property name="concurrentConsumers "value="3"/>
          <property name="maxConcurrentConsumers"value="10"/>
          </bean>
          注解方法的消息轉換

          在調用監聽器之前,在管道中有兩個轉換步驟. 第一個使用 MessageConverter 來將傳入的Spring AMQP Message 轉換成spring-消息系統的消息. 當目標方法調用時,消息負載將被轉換,如果有必要,也會參考消息參數類型來進行.

          第一步中的默認 MessageConverter 是一個Spring AMQP SimpleMessageConverter ,它可以處理String 和 java.io.Serializable對象之間的轉換; 其它所有的將保留為byte[]. 在下面的討論中,我們稱其為消息轉換器.

          第二個步驟的默認轉換器是GenericMessageConverter ,它將委派給轉換服務(DefaultFormattingConversionService的實例). 在下面的討論中,我們稱其為方法參數轉換器.

          要改變消息轉換器,可在連接工廠bean中設置其相關屬性:

          @Bean
          public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
              SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
              ...
              factory.setMessageConverter(new Jackson2JsonMessageConverter());
              ...
              return factory;
          }

          這配置了一個Jackson2 轉換器,希望頭信息能通過它來指導轉換.

          你也可以考慮使用ContentTypeDelegatingMessageConverter ,它可以處理不同內容類型的轉換.

          大多數情況下,沒有必要來定制方法參數轉換器,除非你想要用自定義的ConversionService.

          在1.6版本之前,用于轉換JSON的類型信息必須在消息頭中提供或者需要一個自定義的ClassMapper. 從1.6版本開始,如果沒有類型信息頭,類型可根據目標方法參數推斷.

          類型推斷只能用于 @RabbitListener 的方法級.

          參考 the section called “Jackson2JsonMessageConverter” 來了解更多信息.

          如果您希望自定義方法參數轉換器,您可以這樣做如下:
          @Configuration
          @EnableRabbit
          public class AppConfig implements RabbitListenerConfigurer {
          
              ...
          
              @Bean
          public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
              	DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
              	factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
              	return factory;
              }
          
              @Bean
          public ConversionService myConversionService() {
              	DefaultConversionService conv = new DefaultConversionService();
              	conv.addConverter(mySpecialConverter());
              	return conv;
              }
          
              @Override
          publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
              	registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
              }
          
              ...
          
          }
          重要
          對于多方法監聽器(參考 the section called “Multi-Method Listeners”), 方法選擇是基于消息轉換后的消息負載,方法參數轉換器只在方法被選擇后才會調用.
          編程式 Endpoint 注冊

          RabbitListenerEndpoint 提供了一個Rabbit endpoint 模型并負責為那個模型配置容器.除了通過RabbitListener注解檢測外這個基礎設施允許你通過編程來配置endpoints.

          @Configuration
          @EnableRabbit
          publicclass AppConfig implements RabbitListenerConfigurer {
          
              @Override
          publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
                  SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
                  endpoint.setQueueNames("anotherQueue");
                  endpoint.setMessageListener(message -> {
                      // processing
                  });
                  registrar.registerEndpoint(endpoint);
              }
          }

          在上面的例子中,我們使用了SimpleRabbitListenerEndpoint (它使用MessageListener 來進行處理),但你也可以構建你自己的endpoint變種來描述自定義的調用機制.

          應該指出的是,你也可以跳過@RabbitListener 的使用,通過RabbitListenerConfigurer來編程注冊你的endpoints.

          Annotated Endpoint Method Signature
          到目前為止,我們已經在我們的端點上注入了一個簡單的字符串,但它實際上可以有一個非常靈活的方法簽名。讓我們重寫它,以一個自定義的頭來控制注入順序:
          @Component
          publicclass MyService {
          
              @RabbitListener(queues = "myQueue")
          publicvoid processOrder(Order order, @Header("order_type") String orderType) {
                  ...
              }
          }

          下面是你可以在監聽端點上注入的主要元素:

          原生org.springframework.amqp.core.Message.

          用于接收消息的com.rabbitmq.client.Channel

           org.springframework.messaging.Message 代表的是傳入的AMQP消息.注意,這個消息持有自定義和標準的頭部信息 (AmqpHeaders定義).


          從1.6版本開始, 入站deliveryMode 頭可以AmqpHeaders.RECEIVED_DELIVERY_MODE 使用,代替了AmqpHeaders.DELIVERY_MODE.

          @Header-注解方法參數可 提取一個特定頭部值,包括標準的AMQP頭.

          @Headers-注解參數為了訪問所有頭信息,必須能指定為java.util.Map.

          非注解元素(非支持類型(如. Message 和Channel))可認為是負荷(payload).你可以使用 @Payload來明確標識. 你也可以添加額外的 @Valid來進行驗證.

          注入Spring消息抽象的能力是特別有用的,它可受益于存儲在特定傳輸消息中的信息,而不需要依賴于特定傳輸API.

          @RabbitListener(queues = "myQueue")
          public void processOrder(Message<Order> order) { ...
          }

          方法參數的處理是由DefaultMessageHandlerMethodFactory 提供的,它可以更進一步地定制以支持其它的方法參數. 轉換和驗證支持也可以定制.

          例如,如果我們想確保我們的Order在處理之前是有效的,我們可以使用@Valid 來注解負荷,并配置必須驗證器,就像下面這樣:

          @Configuration
          @EnableRabbit
          public class AppConfig implements RabbitListenerConfigurer {
          
              @Override
          publicvoid configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
                  registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
              }
          
              @Bean
          public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
                  DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
                  factory.setValidator(myValidator());
                  return factory;
              }
          }
          監聽多個隊列

          當使用queues 屬性時,你可以指定相關的容器來監聽多個隊列. 你可以使用 @Header 注解來指定對于那些隊列中收到的消息對POJO方法可用:

          @Component
          public class MyService {
          
              @RabbitListener(queues = { "queue1", "queue2" } )
          public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
                  ...
              }
          
          }

          從1.5版本開始,隊列名稱可以使用屬性占位符和SpEL:

          @Component
          public class MyService {
          
              @RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
          public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
                  ...
              }
          
          }

          1.5版本之前,只有單個隊列可以這種方法進行指定,每個隊列需要一個單獨的屬性.

          回復管理

          MessageListenerAdapter 現有的支持已經允許你的方法有一個非void的返回類型.在這種情況下,調用的結果被封裝在一個發送消息中,其消息發送地址要么是原始消息的ReplyToAddress頭指定的地址要么是監聽器上配置的默認地址.默認地址現在可通過@SendTo 注解進行設置.

          假設我們的processOrder 方法現在需要返回一個OrderStatus, 可將其寫成下面這樣來自動發送一個回復:

          @RabbitListener(destination = "myQueue")
          @SendTo("status")
          public OrderStatus processOrder(Order order) {
              // order processing
           return status;
          }

          如果你需要以傳輸獨立的方式來設置其它頭,你可以返回Message,就像這樣:

          @RabbitListener(destination = "myQueue")
          @SendTo("status")
          public Message<OrderStatus> processOrder(Order order) {
              // order processing
          return MessageBuilder
                  .withPayload(status)
                  .setHeader("code", 1234)
                  .build();
          }

          @SendTo 值按照exchange/routingKey模式(其中的一部分可以省略)來作為對exchange 和 routingKey 的回復.有效值為:

          foo/bar - 以交換器和路由鍵進行回復.

          foo/ - 以交換器和默認路由鍵進行回復.

          bar or /bar - 以路由鍵和默認交換器進行回復.

          / or empty - 以默認交換器和默認路由鍵進行回復.

           @SendTo 也可以沒有value 屬性. 這種情況等價于空的sendTo 模式. @SendTo 只能應用于沒有replyToAddress 屬性的入站消息中.

          從1.5版本開始, @SendTo 值可以通過bean SpEL 表達式初始化,例如…​

          @RabbitListener(queues = "test.sendTo.spel")
          @SendTo("#{spelReplyTo}")
          public String capitalizeWithSendToSpel(String foo) {
              return foo.toUpperCase();
          }
          ...
          @Bean
          public String spelReplyTo() {
              return"test.sendTo.reply.spel";
          }

          表達式必須能評估為String,它可以是簡單的隊列名稱(將發送到默認交換器中) 或者是上面談到的exchange/routingKey 形式.

          在初始化時,#{...} 表達式只評估一次.

          對于動態路由回復,消息發送者應該包含一個reply_to 消息屬性或使用運行時SpEL 表達式.

          從1.6版本開始, @SendTo 可以是SpEL 表達式,它可在運行時根據請求和回復來評估:

          @RabbitListener(queues = "test.sendTo.spel")
          @SendTo("!{'some.reply.queue.with.' + result.queueName}")
          public Bar capitalizeWithSendToSpel(Foo foo) {
              return processTheFooAndReturnABar(foo);
          }

          SpEL 表達式的運行時性質是由 !{...} 定界符表示的. 表達式評估上下文的#root 對象有三個屬性:

          • request - o.s.amqp.core.Message 請求對象.
          • source - 轉換后的 o.s.messaging.Message<?>.
          • result - 方法結果.

          上下文有一個map 屬性訪問器,標準類型轉換器以及一個bean解析器,允許引用上下文中的其它beans (如.@someBeanName.determineReplyQ(request, result)).

          總結一下, #{...} 只在初始化的時候評估一次, #root 對象代表的是應用程序上下文; beans可通過其名稱來引用. !{...} 會在運行時,對于每個消息,都將使用root對象的屬性進行評估,bean可以使用其名稱進行引用,前輟為@.

          多方法監聽器

          從1.5.0版本開始,@RabbitListener 注解現在可以在類級上進行指定.與新的@RabbitHandler 注解一起,基于傳入消息的負荷類型,這可以允許在單個監聽器上調用不同的方法.這可以用一個例子來描述:

          @RabbitListener(id="multi", queues = "someQueue")
          publicclass MultiListenerBean {
          
              @RabbitHandler
          @SendTo("my.reply.queue")
          public String bar(Bar bar) {
                  ...
              }
          
              @RabbitHandler
          public String baz(Baz baz) {
                  ...
              }
          
              @RabbitHandler
          public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
                  ...
              }
          
          }

          在這種情況下,獨立的 @RabbitHandler 方法會被調用,如果轉換后負荷是BarBaz 或Qux. 理解基于負荷類型系統來確定唯一方法是很重要的.類型檢查是通過單個無注解參數來執行的,否則就要使用@Payload 進行注解. 注意同樣的方法簽名可應用于方法級 @RabbitListener 之上.

          注意,如果有必要,需要在每個方法上指定@SendTo, 在類級上它是不支持的.

          @Repeatable @RabbitListener

          從1.6版本開始,@RabbitListener 注解可用 @Repeatable進行標記. 這就是說,這個注解可多次出現在相同的注解元素上(方法或類).在這種情況下,對于每個注解,都會創建獨立的監聽容器,它們每個都會調用相同的監聽器@Bean. Repeatable 注解能用于 Java 8+;當在Java 7-使用時,同樣的效果可以使用 @RabbitListeners "container" 注解(包含@RabbitListener注解的數組)來達到.

          Proxy @RabbitListener and Generics

          如果你的服務是用于代理(如,在 @Transactional的情況中) ,當接口有泛型參數時,需要要一些考慮.要有一個泛型接口和特定實現,如:

          interface TxService<P> {
          
             String handle(P payload, String header);
          
          }
          
          static class TxServiceImpl implements TxService<Foo> {
          
              @Override
           @RabbitListener(...)
           public String handle(Foo foo, String rk) {
                   ...
              }
          
          }

          你被迫切換到CGLIB目標類代理,因為接口handle方法的實際實現只是一個橋接方法.在事務管理的情況下, CGLIB是通過注解選項來配置的- @EnableTransactionManagement(proxyTargetClass = true). 在這種情況下,所有注解都需要在實現類的目標方法上進行聲明:

          static class TxServiceImpl implements TxService<Foo> {
          
           @Override
          @Transactional
          @RabbitListener(...)
          public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) {
                  ...
              }
          
          }
          容器管理

          由注解創建的容器不會在上下文中進行注冊.你可以調用 RabbitListenerEndpointRegistrygetListenerContainers()方法來獲取所有容器集合.然后,你可以迭代這個集合,例如,停止/啟動所有容器或調用在其注冊上調用Lifecycle 方法(調用每個容器中的操作).

          你也可以使用id來獲取單個容器的引用,即 getListenerContainer(String id); 例如registry.getListenerContainer("multi") .

          從1.5.2版本開始,你可以調用getListenerContainerIds()方法來獲取所有注冊容器的id.

          從1.5版本開始,你可在RabbitListener端點上為容器分配一個組(group).這提供了一種機制來獲取子集容器的引用; 添加一個group 屬性會使Collection<MessageListenerContainer> 類型的bean使用組名稱注冊在上下文中.

          線程和異步消費者

          一些不同的線程可與異步消費者關聯。

          當RabbitMQ Client投遞消息時,來自于SimpleMessageListener 配置的TaskExecutor中的線程會調用MessageListener.如果沒有配置,將會使用SimpleAsyncTaskExecutor. 如果使用了池化的executor,須確保池大小可以支撐并發處理.

          當使用默認SimpleAsyncTaskExecutor時,對于調用監聽器的線程,監聽器容器的beanName 將用作threadNamePrefix. 這有益于日志分析,在日志appender配置中,一般建議總是包含線程名稱.當在SimpleMessageListenerContainertaskExecutor屬性中指定TaskExecutor 時,線程名稱是不能修改的.建議你使用相似的技術來命名線程, 幫助在日志消息中的線程識別。

          當創建連接時,在CachingConnectionFactory 配置的Executor將傳遞給RabbitMQ Client ,并且它的線程將用于投遞新消息到監聽器容器.在寫作的時候,如果沒有配置,client會使用池大小為5的內部線程池executor.

          RabbitMQ client 使用ThreadFactory 來為低端I/O(socket)操作創建線程.要改變這個工廠,你需要配置底層RabbitMQ ConnectionFactory, 正如the section called “Configuring the Underlying Client Connection Factory”中所描述.

          檢測空閑異步消費者

          雖然高效,但異步消費者存在一個問題:如何來探測它們什么是空閑的 - 當有一段時間沒有收到消息時,用戶可能想要采取某些動作.

          從1.6版本開始, 當沒有消息投遞時,可配置監聽器容器來發布ListenerContainerIdleEvent 事件. 當容器是空閑的,事件會每隔idleEventInterval 毫秒發布事件.

          要配置這個功能,須在容器上設置idleEventInterval:

          xml
          <rabbit:listener-container connection-factory="connectionFactory"...idle-event-interval="60000"...
                  >
          <rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
          </rabbit:listener-container>
          Java
          @Bean
          public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
              SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
              ...
              container.setIdleEventInterval(60000L);
              ...
              return container;
          }
          @RabbitListener
          @Bean
          public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
              SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
              factory.setConnectionFactory(rabbitConnectionFactory());
              factory.setIdleEventInterval(60000L);
              ...
              return factory;
          }

          在上面這些情況中,當容器空閑時,每隔60秒就會發布事件.

          事件消費

          通過實現ApplicationListener 可捕獲這些事件- 要么是一個一般的監聽器,要么是一個窄化的只接受特定事件的監聽器. 你也可以使用Spring Framework 4.2中引入的@EventListener.

          下面的例子在單個類中組合使用了@RabbitListener 和@EventListener .重點要理解,應用程序監聽器會收到所有容器的事件,因此如果你只對某個容器采取措施,那么你需要檢查監聽器id.你也可以使用@EventListener 條件來達到此目的.

          事件有4個屬性:

          • source - 監聽容器實例
          • id - 監聽器id(或容器bean名稱)
          • idleTime - 當事件發布時,容器已經空閑的時間
          • queueNames - 容器監聽的隊列名稱
          public class Listener {
          
              @RabbitListener(id="foo", queues="#{queue.name}")
              public String listen(String foo) {
                  return foo.toUpperCase();
              }
          
              @EventListener(condition = "event.listenerId == 'foo'")
              public void onApplicationEvent(ListenerContainerIdleEvent event) {
                  ...
              }
          
          }
          重要
          事件監聽器會查看所有容器的事件,因此,在上面的例子中,我們根據監聽器ID縮小了要接收的事件.
          警告
          如果你想使用idle事件來停止監聽器容器,你不應該在調用監聽器的線程上來調用container.stop() 方法- 它會導致延遲和不必要的日志消息. 相反,你應該把事件交給一個不同的線程,然后可以停止容器。

          3.1.7 消息轉換器

          介紹

          AmqpTemplate 同時也定義了多個發送和接收消息(委派給MessageConverter)的方法.

          MessageConverter 本身是很簡單的. 在每個方向上它都提供了一個方法:一個用于轉換成Message,另一個用于從Message中轉換.注意,當轉換成Message時,除了object外,你還需要提供消息屬性. "object"參數通常對應的是Message body.

          public interface MessageConverter {
          
              Message toMessage(Object object, MessageProperties messageProperties)
                      throws MessageConversionException;
          
              Object fromMessage(Message message) throws MessageConversionException;
          
          }

          AmqpTemplate中相關的消息發送方法列舉在下邊. 這比我們前面提到的要簡單,因為它們不需要Message 實例. 相反地,  MessageConverter 負責創建每個消息(通過將提供的對象轉換成Message body的字節數組,以及添加提供的MessageProperties).

          void convertAndSend(Object message) throws AmqpException;
          
          void convertAndSend(String routingKey, Object message) throws AmqpException;
          
          void convertAndSend(String exchange, String routingKey, Object message)
              throws AmqpException;
          
          void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
              throws AmqpException;
          
          void convertAndSend(String routingKey, Object message,
              MessagePostProcessor messagePostProcessor) throws AmqpException;
          
          void convertAndSend(String exchange, String routingKey, Object message,
              MessagePostProcessor messagePostProcessor) throws AmqpException;

          在接收端,這里只有兩個方法:一個接受隊列名稱,另一個依賴于模板設置的隊列屬性.

          Object receiveAndConvert() throws AmqpException;
          
          Object receiveAndConvert(String queueName) throws AmqpException;
          在 the section called “Asynchronous Consumer” 中提到的MessageListenerAdapter也使用了MessageConverter.

          SimpleMessageConverter

          MessageConverter 策略的默認實現被稱為SimpleMessageConverter. 如果你沒有明確配置,RabbitTemplate實例會使用此轉換器的實例.它能處理基于文本內容,序列化Java對象,以及簡單的字節數組.

          從 Message中轉換

          如果傳入消息的內容類型以"text" (如. "text/plain")開頭,它同時也會檢查內容編碼屬性,以確定將消息body字節數組轉換成字符串所要使用的字符集. 如果在輸入消息中沒有指定內容編碼屬性, 它默認會使用"UTF-8"字符集.如果你需要覆蓋默認設置,你可以配置一個SimpleMessageConverter實例,設置其"defaultCharset" 屬性,再將其注入到RabbitTemplate 實例中.

          如果傳入消息的內容類型屬性值為"application/x-java-serialized-object", SimpleMessageConverter 將嘗試將字節數組反序列化為一個Java object. 雖然這對于簡單的原型是有用的,但一般不推薦依賴于Java序列化機制,因為它會生產者和消費者之間的緊密耦合。當然,這也排除了在兩邊使用非Java的可能性.由于AMQP 是線路級協議, 因這樣的限制失去了許多優勢,這是不幸的. 在后面的兩個章節中,我們將探討通過豐富的域對象的內容來替代java序列化.

          對于其它內容類型,SimpleMessageConverter 會以字節數組形式直接返回消息body內容.

          參考the section called “Java Deserialization” 來了解更多信息.

          轉換成消息

          當從任意Java對象轉換成Message時, SimpleMessageConverter 同樣可以處理字節數組,字符串,以及序列化實例.它會將每一種都轉換成字節(在字節數組的情況下,不需要任何轉換), 并且會相應地設置內容類型屬性.如果要轉換的對象不匹配這些類型,Message body 將是null.

          SerializerMessageConverter

          除了它可以使用其它application/x-java-serialized-object轉換的Spring框架Serializer 和 Deserializer 實現來配置外,此轉換器類似于SimpleMessageConverter

          參考the section called “Java Deserialization” 來了解更多信息.

          Jackson2JsonMessageConverter

          轉換成消息

          正如前面章節提到的,一般來說依賴于Java序列化機制不是推薦的.另一個常見更靈活且可跨語言平臺的選擇JSON (JavaScript Object Notation).可通過在RabbitTemplate實例上配置轉換器來覆蓋默認SimpleMessageConverter.Jackson2JsonMessageConverter 使用的是com.fasterxml.jackson 2.x 包.

          <bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
          <property name="connectionFactory" ref="rabbitConnectionFactory"/>
          <property name="messageConverter">
          <bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
          <!-- if necessary, override the DefaultClassMapper -->
          <property name="classMapper"  ref="customClassMapper"/>
          </bean>
          </property>
          </bean>

          正如上面展示的, Jackson2JsonMessageConverter 默認使用的是DefaultClassMapper. 類型信息是添加到MessageProperties中的(也會從中獲取). 如果入站消息在MessageProperties沒有包含類型信息,但你知道預期類型,你可以使用defaultType 屬性來配置靜態類型

          <bean id="jsonConverterWithDefaultType" class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
          <property name="classMapper">
          <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
          <property name="defaultType" value="foo.PurchaseOrder"/>
          </bean>
          </property>
          </bean>
          轉換Message

          入站消息會根據發送系統頭部中添加的類型信息來轉換成對象.

          在1.6之前的版本中,如果不存在類型信息,轉換將失敗。從1.6版開始,如果類型信息丟失,轉換器將使用Jsckson默認值(通常是一個map)來轉換JSON.
          此外,從1.6版本開始,當在方法上使用@RabbitListener 注解時, 推斷類型信息會添加到MessageProperties; 這允許轉換器轉換成目標方法的參數類型.這只適用于無注解的參數或使用@Payload注解的單個參數. 在分析過程中忽略類型消息的參數。

          重要
          默認情況下,推斷類型信息會覆蓋inbound __TypeId__ 和發送系統創建的相關headers. 這允許允許接收系統自動轉換成不同的領域對象. 這只適用于具體的參數類型(不是抽象的或不是接口)或者來自java.util 包中的對象.其它情況下,將使用 __TypeId__ 和相關的頭.也可能有你想覆蓋默認行為以及總是使用__TypeId__信息的情況. 例如, 讓我們假設你有一個接受Foo參數的@RabbitListener ,但消息中包含了Bar( 它是的Foo (具體類)的子類). 推斷類型是不正確的.要處理這種情況,需要設置Jackson2JsonMessageConverter 的TypePrecedence 屬性為TYPE_ID 而替換默認的INFERRED. 這個屬性實際上轉換器的DefaultJackson2JavaTypeMapper ,但為了方便在轉換器上提供了一個setter方法. 如果你想注入一個自定義類型mapper, 你應該設置屬性mapper.
          @RabbitListener
          public void foo(Foo foo) {...}
          
          @RabbitListener
          public void foo(@Payload Foo foo, @Header("amqp_consumerQueue") String queue) {...}
          
          @RabbitListener
          public void foo(Foo foo, o.s.amqp.core.Message message) {...}
          
          @RabbitListener
          public void foo(Foo foo, o.s.messaging.Message<Foo> message) {...}
          
          @RabbitListener
          public void foo(Foo foo, String bar) {...}
          
          @RabbitListener
          public void foo(Foo foo, o.s.messaging.Message<?> message) {...}

          上面前4種情況下,轉換器會嘗試轉換成Foo 類型. 第五個例子是無效的,因為我們不能確定使用哪個參數來接收消息負荷. 在第六個例子中, Jackson 會根據泛型WildcardType來應用.

          然而,你也可以創建一個自定義轉換器,并使用targetMethod 消息屬性來決定將JSON轉換成哪種類型.

          這種類型接口只能在@RabbitListener 注解聲明在方法級上才可實現.在類級@RabbitListener, 轉換類型用來選擇調用哪個@RabbitHandler 方法.基于這個原因,基礎設施提供了targetObject 消息屬性,它可用于自定義轉換器來確定類型.

          MarshallingMessageConverter

          還有一個選擇是MarshallingMessageConverter.它會委派到Spring OXM 包的 Marshaller 和 Unmarshaller 策略接口實現. 

          你可從here了解更多. 在配置方面,最常見的是只提供構造器參數,因為大部分Marshaller 的實現都將實現Unmarshaller.

          <bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
          <property name="connectionFactory" ref="rabbitConnectionFactory"/>
          <property name="messageConverter">
          <bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
          <constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
          </bean>
          </property>
          </bean>

          ContentTypeDelegatingMessageConverter

          這個類是在1.4.2版本中引入的,并可基于MessagePropertiescontentType屬性允許委派給一個特定的MessageConverter.默認情況下,如果沒有contentType屬性或值沒有匹配配置轉換器時,它會委派給SimpleMessageConverter.

          <bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
          <property name="delegates">
          <map>
          <entry key="application/json" value-ref="jsonMessageConverter" />
          <entry key="application/xml" value-ref="xmlMessageConverter" />
          </map> 
          </property>
          </bean>

          Java 反序列化

          重要
          當從不可信任的來源反序列化Java對象時,存在一個可能的漏洞.如果從不可信來源,使用內容類型 application/x-java-serialized-object來接收消息,你可以考慮配置允許哪些包/類能反序列化.這既適用于SimpleMessageConverter,也適用于SerializerMessageConverter,當它被配置為使用一個DefaultDeserializer時 -或含蓄地或通過配置方式的。

          默認情況下,白名單列表是空的,這意味著所有類都會反序列化.你可以設置模式列表,如 foo.*foo.bar.Baz 或 *.MySafeClass.模式會按照順序進行檢查,直到找到匹配的模式.如果沒有找到匹配,將拋出SecurityException.在這些轉換器上,可使用whiteListPatterns 屬性來設置.

          消息屬性轉換器

           MessagePropertiesConverter 策略接口用于Rabbit Client BasicProperties 與Spring AMQP MessageProperties之間轉換. 默認實現(DefaultMessagePropertiesConverter)通常可滿雖大部分需求,但如果有需要,你可以自己實現. 當大小不超過1024字節時,默認屬性轉換器將 BasicProperties 中的LongString 轉換成String . 更大的 LongString 將不會進行轉換(參考下面的內容.這個限制可通過構造器參數來覆蓋.

          從1.6版本開始, 現在headers 長超過 long string 限制(默認為1024) 將被DefaultMessagePropertiesConverter保留作為 LongString . 你可以通過 the getBytes[]toString(), 或getStream() 方法來訪問內容.

          此前, DefaultMessagePropertiesConverter 會將這樣的頭轉換成一個 DataInputStream (實際上它只是引用了LongStringDataInputStream). 在輸出時,這個頭不會進行轉換(除字符串外,如在流上調用toString()方法 java.io.DataInputStream@1d057a39).

          更大輸入LongString 頭現在可正確地轉換,在輸出時也一樣.

          它提供了一個新的構造器來配置轉換器,這樣可像以前一樣來工作:

          /**
           * Construct an instance where LongStrings will be returned
           * unconverted or as a java.io.DataInputStream when longer than this limit.
           * Use this constructor with 'true' to restore pre-1.6 behavior.
           * @param longStringLimit the limit.
           * @param convertLongLongStrings LongString when false,
           * DataInputStream when true.
           * @since 1.6
           */
          public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }

          另外,從1.6版本開始,在 MessageProperties中添加了一個新屬性correlationIdString.此前,當在RabbitMQ 客戶端中轉換BasicProperties 時,將會執行不必要的byte[] <-> String 轉換,這是因為 MessageProperties.correlationId 是一個byte[] 而 BasicProperties 使用的是String

          (最終,RabbitMQ客戶端使用UTF-8字符串轉化為字節并放在協議消息中).


          為提供最大向后兼容性,新屬性correlationIdPolicy 已經被加入到了DefaultMessagePropertiesConverter.它接受DefaultMessagePropertiesConverter.CorrelationIdPolicy 枚舉參數.

          默認情況下,它設置為BYTES (復制先前的行為).

          對于入站消息:

          • STRING - 只映射correlationIdString 屬性
          • BYTES - 只映射correlationId 屬性
          • BOTH - 會同時映射兩個屬性
          對于出站消息:
          • STRING - 只映射correlationIdString 屬性
          • BYTES - 只映射correlationId 屬性
          • BOTH - 兩種屬性都會考慮,但會優先考慮String 屬性

          也從1.6版本開始,入站deliveryMode 屬性不再需要映射 MessageProperties.deliveryMode,相反使用MessageProperties.receivedDeliveryMode 來代替.另外,入站userId 屬性也不需要再映射MessageProperties.userId,相反使用MessageProperties.receivedUserId 來映射. 

          這種變化是為了避免這些屬性的意外傳播,如果同樣的MessageProperties 對象用于出站消息時.

          3.1.8 修改消息- 壓縮以及更多

          提供了許多的擴展點,通過它們你可以對消息執行預處理,要么在發送RabbitMQ之前,要么在接收到消息之后.

          正如你在Section 3.1.7, “Message Converters”看到的,這樣的擴展點存在于AmqpTemplate convertAndReceive 操作中,在那里你可以提供一個MessagePostProcessor

          例如,你的POJO轉換之后, MessagePostProcessor 允許你在Message上設置自定義的頭或屬性.

          從1.4.2版本開始,額外的擴展點已經添加到RabbitTemplate - setBeforePublishPostProcessors() 和setAfterReceivePostProcessors(). 第一個開啟了一個post processor來在發送消息到RabbitMQ之前立即運行.當使用批量時(參考 the section called “Batching”), 這會在批處理裝配之后發送之前調用. 

          第二個會在收到消息后立即調用.

          這些擴展點對于壓縮這此功能是有用的,基于這些目的,提供了多個MessagePostProcessor:

          • GZipPostProcessor
          • ZipPostProcessor

          針對于發送前的消息壓縮,以及

          • GUnzipPostProcessor
          • UnzipPostProcessor

          針對于消息解壓.

          類似地, SimpleMessageListenerContainer 也有一個 setAfterReceivePostProcessors() 方法,

          允許在消息收到由容器來執行解壓縮.


          posted on 2016-08-13 12:48 胡小軍 閱讀(13052) 評論(0)  編輯  收藏 所屬分類: RabbitMQ
          主站蜘蛛池模板: 屏南县| 宾川县| 宣汉县| 兴业县| 葫芦岛市| 土默特右旗| 芷江| 浦北县| 弥勒县| 孝感市| 承德市| 海门市| 乾安县| 岳阳市| 肇东市| 屏南县| 锦屏县| 通道| 崇信县| 崇文区| 南投市| 景东| 辉南县| 高平市| 锡林浩特市| 富源县| 万山特区| 渭南市| 灵川县| 霍山县| 屏边| 团风县| 额济纳旗| 泸水县| 东台市| 仁布县| 安平县| 德格县| 健康| 太仓市| 阳东县|