paulwong

          My Links

          Blog Stats

          常用鏈接

          留言簿(67)

          隨筆分類(1393)

          隨筆檔案(1151)

          文章分類(7)

          文章檔案(10)

          相冊

          收藏夾(2)

          AI

          Develop

          E-BOOK

          Other

          養生

          微服務

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          60天內閱讀排行

          How to implement JMS ReplyTo using SpringBoot

          Request-Response is a message-exchange-pattern. In some cases, a message producer may want the consumers to reply to a message. The JMSReplyTo header indicates which destination, if any, a JMS consumer should reply to. The JMSReplyTo header is set explicitly by the JMS client; its contents will be a javax.jms.Destination object (either Topic or Queue).

          In some cases, the JMS client will want the message consumers to reply to a temporary topic or queue set up by the JMS client. When a JMS message consumer receives a message that includes a JMSReplyTo destination, it can reply using that destination. A JMS consumer is not required to send a reply, but in some JMS applications, clients are programmed to do so.

          For simplicity, this pattern is typically implemented in a purely synchronous fashion, as in web service calls over HTTP, which holds a connection open and waits until the response is delivered or the timeout period expires. However, request–response may also be implemented asynchronously, with a response being returned at some unknown later time.

          For more information, check here.

          Now, let’s jump into the code. In Spring, there are 2 ways to implement this (at least I know of).

          1. Using JMSTemplate
          2. Using Spring Integration

          For demo purpose, I used ActiveMQ. However, you can implement this in other messaging systems like IBM MQ, Rabbit MQ, Tibco EMS, etc. In this demo, I send an ObjectMessage of type Order and reply with a Shipment object.

          Using JMSTemplate

          1. First, we include the required dependencies. Replace the activemq dependency with your messaging system’s jars if not using ActiveMQ

             <dependencies>
                 
            <dependency>
                     
            <groupId>org.springframework.boot</groupId>
                     
            <artifactId>spring-boot-starter-activemq</artifactId>
                 
            </dependency>
                 
            <dependency>
                     
            <groupId>org.apache.activemq.tooling</groupId>
                     
            <artifactId>activemq-junit</artifactId>
                     
            <version>${activemq.version}</version>
                     
            <scope>test</scope>
                 
            </dependency>
                 
            <dependency>
                     
            <groupId>org.springframework.boot</groupId>
                     
            <artifactId>spring-boot-starter-test</artifactId>
                     
            <scope>test</scope>
                 
            </dependency>
             
            </dependencies>
          2. Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.

             spring:
               activemq:
                 broker-url: tcp://localhost:
            61616
                 non-blocking-redelivery: true
                 packages:
                   trust-all: true    
          3. Note in the above configuration spring.activemq.packages.trust-all can be changed to spring.activemq.packages.trusted with the appropriate packages.
          4. Now spring will do it’s magic and inject all the required Beans as usual :) However, in our code, we need to EnableJms

            import org.springframework.context.annotation.Configuration;
             
            import org.springframework.jms.annotation.EnableJms;

             @EnableJms
             @Configuration
             
            public class ActiveMQConfig {

                 
            public static final String ORDER_QUEUE = "order-queue";
                 
            public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";

             }
          5. First, we will configure the Producer

             @Slf4j
             @Service
             
            public class Producer {

                 @Autowired
                 JmsMessagingTemplate jmsMessagingTemplate;

                 @Autowired
                 JmsTemplate jmsTemplate;
                  private Session session;

                  @PostConstruct
                   public void init(){
                     jmsTemplate.setReceiveTimeout(1000L);
                     jmsMessagingTemplate.setJmsTemplate(jmsTemplate);

                     session = jmsMessagingTemplate.getConnectionFactory().createConnection()
                             .createSession(false, Session.AUTO_ACKNOWLEDGE);
                   }

                 
            public Shipment sendWithReply(Order order) throws JMSException {


                     ObjectMessage objectMessage 
            = session.createObjectMessage(order);

                     objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
                     objectMessage.setJMSReplyTo(
            new ActiveMQQueue(ORDER_REPLY_2_QUEUE));
                     objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
                     objectMessage.setJMSExpiration(
            1000L);
                     objectMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);

                     
            return jmsMessagingTemplate.convertSendAndReceive(new ActiveMQQueue(ORDER_QUEUE),
                             objectMessage, Shipment.
            class); //this operation seems to be blocking + sync
                 }
             }
          6. Note in the above code that, JmsMessagingTemplate is used instead of JmsTemplatebecause, we are interested in the method convertSendAndReceive. As seen in the method signature, it waits to receive the Shipment object from the consumer.
          7. Next, we can see the Receiver

             @Component
             
            public class Receiver implements SessionAwareMessageListener<Message> {

                 @Override
                 @JmsListener(destination 
            = ORDER_QUEUE)
                 
            public void onMessage(Message message, Session session) throws JMSException {
                     Order order 
            = (Order) ((ActiveMQObjectMessage) message).getObject();
                     Shipment shipment 
            = new Shipment(order.getId(), UUID.randomUUID().toString());

                     
            // done handling the request, now create a response message
                     final ObjectMessage responseMessage = new ActiveMQObjectMessage();
                     responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
                     responseMessage.setObject(shipment);

                     
            // Message sent back to the replyTo address of the income message.
                     final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
                     producer.send(responseMessage);
                 }
             }
          8. Using the javax.jms.Session the javax.jms.MessageProducer is created and used to send the reply message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.

          Using Spring Integration

          1. First, we include the required dependencies in addition to the above dependencies

             <dependency>
               
            <groupId>org.springframework.integration</groupId>
               
            <artifactId>spring-integration-jms</artifactId>
             
            </dependency>
          2. Using the default spring.activemq. properties to configure the application with the ActiveMQ. However, you can do this inside a @Configuration class as well.

             spring:
               activemq
            :
                 broker
            -url: tcp://localhost:61616
                 non
            -blocking-redelivery: true
                 packages
            :
                   trust
            -all: true   
          3. Note in the above configuration spring.activemq.packages.trust-all can be changed to spring.activemq.packages.trusted with the appropriate packages.
          4. Next we create the required Beans for the Spring Integration.

             @EnableIntegration
             @IntegrationComponentScan
             @Configuration
             
            public class ActiveMQConfig {

                 
            public static final String ORDER_QUEUE = "order-queue";
                 
            public static final String ORDER_REPLY_2_QUEUE = "order-reply-2-queue";

                 @Bean
                 
            public MessageConverter messageConverter() {
                     MappingJackson2MessageConverter converter 
            = new MappingJackson2MessageConverter();
                     converter.setTargetType(MessageType.TEXT);
                     converter.setTypeIdPropertyName(
            "_type");
                     
            return converter;
                 }

                 @Bean
                 
            public MessageChannel requests() {
                     
            return new DirectChannel();
                 }

                 @Bean
                 @ServiceActivator(inputChannel 
            = "requests")
                 
            public JmsOutboundGateway jmsGateway(ActiveMQConnectionFactory activeMQConnectionFactory) {
                     JmsOutboundGateway gateway 
            = new JmsOutboundGateway();
                     gateway.setConnectionFactory(activeMQConnectionFactory);
                     gateway.setRequestDestinationName(ORDER_QUEUE);
                     gateway.setReplyDestinationName(ORDER_REPLY_2_QUEUE);
                     gateway.setCorrelationKey(
            "JMSCorrelationID");
                     gateway.setSendTimeout(
            100L);
                     gateway.setReceiveTimeout(
            100L);
                     
            return gateway;
                 }

                 @Autowired
                 Receiver receiver;

                 @Bean
                 
            public DefaultMessageListenerContainer responder(ActiveMQConnectionFactory activeMQConnectionFactory) {
                     DefaultMessageListenerContainer container 
            = new DefaultMessageListenerContainer();
                     container.setConnectionFactory(activeMQConnectionFactory);
                     container.setDestinationName(ORDER_QUEUE);
                     MessageListenerAdapter adapter 
            = new MessageListenerAdapter(new Object() {

                         @SuppressWarnings(
            "unused")
                         
            public Shipment handleMessage(Order order) {
                             
            return receiver.receiveMessage(order);
                         }

                     });
                     container.setMessageListener(adapter);
                     
            return container;
                 }
             }
          5. Next, we will configure the MessagingGateway

             @MessagingGateway(defaultRequestChannel = "requests")
             
            public interface ClientGateway {
                 Shipment sendAndReceive(Order order);
             }
          6. We then Autowire this gateway in our Component class when we want to send and receive the message. A sample is shown below.

             @Slf4j
             @Component
             
            public class Receiver {
                 
            public Shipment receiveMessage(@Payload Order order) {
                     Shipment shipment 
            = new Shipment(order.getId(), UUID.randomUUID().toString());
                     
            return shipment;
                 }
             }
          7. Next we configure the Componen to process the Order message. After successful execution, this component will send the Shipment message to the JMSReplyTo queue. In real life, this receiver could be a different application altogether.

          For those, who just want to clone the code, head out to aniruthmp/jms

          Written on June 5, 2018
          https://aniruthmp.github.io/Spring-JMS-request-response/

          posted on 2019-06-27 09:20 paulwong 閱讀(477) 評論(0)  編輯  收藏 所屬分類: JMS

          主站蜘蛛池模板: 通化市| 康马县| 邛崃市| 永兴县| 金华市| 临清市| 保亭| 松滋市| 司法| 宿迁市| 南昌县| 什邡市| 宜章县| 陕西省| 昌都县| 米林县| 长岭县| 佳木斯市| 嘉兴市| 绥滨县| 黄浦区| 阳江市| 临朐县| 海城市| 大化| 双桥区| 云霄县| 大城县| 余干县| 澎湖县| 射阳县| 贵州省| 繁峙县| 金坛市| 南阳市| 永平县| 祥云县| 南川市| 达州市| 柳河县| 五大连池市|