瘋狂

          STANDING ON THE SHOULDERS OF GIANTS
          posts - 481, comments - 486, trackbacks - 0, articles - 1
            BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理

          Spring和 jms

          Posted on 2013-04-11 11:30 瘋狂 閱讀(3652) 評論(0)  編輯  收藏 所屬分類: spring

          主要的幾個(gè)類說明:

          1 JmsTemplate 用于發(fā)送和接受消息。需要消息工廠參數(shù)。

          基于監(jiān)聽:

          2 DefaultMessageListenerContainerSimpleMessageListenerContainer這兩個(gè)容器可以創(chuàng)建多個(gè)session和消費(fèi)者來對每個(gè)隊(duì)列進(jìn)行消息處理并條用消息監(jiān)聽類的方法進(jìn)行處理。并通過多線程進(jìn)行處理。每個(gè)線程通過輪訓(xùn)的方式(while(true))去進(jìn)行消息接收處理。

          3 MessageListenerAdapter 它相當(dāng)于一個(gè)特殊的自定義監(jiān)聽器,里面可以調(diào)用一些消息格式的裝換工具。例如jsonxmlstringbean的轉(zhuǎn)換等等。

           

          其中DefaultMessageListenerContainer接受動態(tài)添加session。而后者不支持。

          ----

          具體消息監(jiān)聽處理源碼過程如下:以DefaultMessageListenerContainer為例:

          ----

          1 初始化建立消費(fèi)者線程:

          /**

               * Creates the specified number of concurrent consumers,

               * in the form of a JMS Session plus associated MessageConsumer.

               * @see #createListenerConsumer

               */

              protected void doInitialize() throws JMSException {

                  establishSharedConnection();

                  initializeConsumers();

              }

          protected void initializeConsumers() throws JMSException {

                  // Register Sessions and MessageConsumers.

                  synchronized (this.consumersMonitor) {

                      if (this.consumers == null) {

                          this.sessions = new HashSet<Session>(this.concurrentConsumers);

                          this.consumers = new HashSet<MessageConsumer>(this.concurrentConsumers);

                          Connection con = getSharedConnection();

                          for (int i = 0; i < this.concurrentConsumers; i++) {

                              Session session = createSession(con);

                              MessageConsumer consumer = createListenerConsumer(session);

                              this.sessions.add(session);

                              this.consumers.add(consumer);

                          }

                      }

                  }

              }

          2 啟動消費(fèi)者線程

          /**

               * Start the shared Connection, if any, and notify all invoker tasks.

               * @throws JMSException if thrown by JMS API methods

               * @see #startSharedConnection

               */

              protected void doStart() throws JMSException {

                  // Lazily establish a shared Connection, if necessary.

                  if (sharedConnectionEnabled()) {

                      establishSharedConnection();

                  }

           

                  // Reschedule paused tasks, if any.

                  synchronized (this.lifecycleMonitor) {

                      this.running = true;

                      this.lifecycleMonitor.notifyAll();

                      resumePausedTasks();

                  }

           

                  // Start the shared Connection, if any.

                  if (sharedConnectionEnabled()) {

                      startSharedConnection();

                  }

              }

          3消息消費(fèi)

          其中的resumePausedTasks方法會進(jìn)行輪訓(xùn)處理:

          protected void resumePausedTasks() {

                  synchronized (this.lifecycleMonitor) {

                      if (!this.pausedTasks.isEmpty()) {//所有的線程一開始會被置為暫停

                          for (Iterator<?> it = this.pausedTasks.iterator(); it.hasNext();) {

                              Object task = it.next();

                              try {

                                  doRescheduleTask(task);

                                  it.remove();

                                  if (logger.isDebugEnabled()) {

                                      logger.debug("Resumed paused task: " + task);

                                  }

                              }

                              catch (RuntimeException ex) {

                                  logRejectedTask(task, ex);

                                  // Keep the task in paused mode...

                              }

                          }

                      }

                  }

          每個(gè)線程是一個(gè) AsyncMessageListenerInvoker,通過它的run方法來進(jìn)行消息處理:

          先執(zhí)行:

              private boolean invokeListener() throws JMSException {

                      initResourcesIfNecessary();

                      boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);

                      this.lastMessageSucceeded = true;

                      return messageReceived;

                  }

          接著:

          Message message = receiveMessage(consumerToUse);

           

          這就是最終的阻塞方法去獲取消息

          /**

               * Receive a message from the given consumer.

               * @param consumer the MessageConsumer to use

               * @return the Message, or <code>null</code> if none

               * @throws JMSException if thrown by JMS methods

               */

              protected Message receiveMessage(MessageConsumer consumer) throws JMSException {

                  return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));

              }

          4調(diào)用我們自定義的監(jiān)聽器:

          其中監(jiān)聽器可以實(shí)現(xiàn)SessionAwareMessageListenerMessageListener接口,不一樣的地方在于前者會把sessionmessage參數(shù)給你,你可以進(jìn)行一個(gè)回調(diào)操作。后者只有消息內(nèi)容message參數(shù)。

          protected void invokeListener(Session session, Message message) throws JMSException {

                  Object listener = getMessageListener();

                  if (listener instanceof SessionAwareMessageListener) {

                      doInvokeListener((SessionAwareMessageListener) listener, session, message);

                  }

                  else if (listener instanceof MessageListener) {

                      doInvokeListener((MessageListener) listener, message);

                  }

                  else if (listener != null) {

                      throw new IllegalArgumentException(

                              "Only MessageListener and SessionAwareMessageListener supported: " + listener);

                  }

                  else {

                      throw new IllegalStateException("No message listener specified - see property 'messageListener'");

                  }

              }

          主站蜘蛛池模板: 福贡县| 霍城县| 沂源县| 米脂县| 灵璧县| 宜兴市| 邛崃市| 穆棱市| 尼玛县| 阿瓦提县| 桃源县| 临沧市| 白朗县| 平塘县| 临夏市| 塘沽区| 凌海市| 肃北| 宁城县| 芜湖县| 固安县| 依安县| 北宁市| 和顺县| 五家渠市| 安顺市| 南京市| 昂仁县| 合肥市| 泸水县| 松江区| 色达县| 清苑县| 湖南省| 兴海县| 棋牌| 楚雄市| 东乡| 康乐县| 紫云| 含山县|