瘋狂

          STANDING ON THE SHOULDERS OF GIANTS
          posts - 481, comments - 486, trackbacks - 0, articles - 1
            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

          Spring和 jms

          Posted on 2013-04-11 11:30 瘋狂 閱讀(3652) 評論(0)  編輯  收藏 所屬分類: spring

          主要的幾個類說明:

          1 JmsTemplate 用于發送和接受消息。需要消息工廠參數。

          基于監聽:

          2 DefaultMessageListenerContainerSimpleMessageListenerContainer這兩個容器可以創建多個session和消費者來對每個隊列進行消息處理并條用消息監聽類的方法進行處理。并通過多線程進行處理。每個線程通過輪訓的方式(while(true))去進行消息接收處理。

          3 MessageListenerAdapter 它相當于一個特殊的自定義監聽器,里面可以調用一些消息格式的裝換工具。例如jsonxmlstringbean的轉換等等。

           

          其中DefaultMessageListenerContainer接受動態添加session。而后者不支持。

          ----

          具體消息監聽處理源碼過程如下:以DefaultMessageListenerContainer為例:

          ----

          1 初始化建立消費者線程:

          /**

               * Creates the specified number of concurrent consumers,

               * in the form of a JMS Session plus associated MessageConsumer.

               * @see #createListenerConsumer

               */

              protected void doInitialize() throws JMSException {

                  establishSharedConnection();

                  initializeConsumers();

              }

          protected void initializeConsumers() throws JMSException {

                  // Register Sessions and MessageConsumers.

                  synchronized (this.consumersMonitor) {

                      if (this.consumers == null) {

                          this.sessions = new HashSet<Session>(this.concurrentConsumers);

                          this.consumers = new HashSet<MessageConsumer>(this.concurrentConsumers);

                          Connection con = getSharedConnection();

                          for (int i = 0; i < this.concurrentConsumers; i++) {

                              Session session = createSession(con);

                              MessageConsumer consumer = createListenerConsumer(session);

                              this.sessions.add(session);

                              this.consumers.add(consumer);

                          }

                      }

                  }

              }

          2 啟動消費者線程

          /**

               * Start the shared Connection, if any, and notify all invoker tasks.

               * @throws JMSException if thrown by JMS API methods

               * @see #startSharedConnection

               */

              protected void doStart() throws JMSException {

                  // Lazily establish a shared Connection, if necessary.

                  if (sharedConnectionEnabled()) {

                      establishSharedConnection();

                  }

           

                  // Reschedule paused tasks, if any.

                  synchronized (this.lifecycleMonitor) {

                      this.running = true;

                      this.lifecycleMonitor.notifyAll();

                      resumePausedTasks();

                  }

           

                  // Start the shared Connection, if any.

                  if (sharedConnectionEnabled()) {

                      startSharedConnection();

                  }

              }

          3消息消費

          其中的resumePausedTasks方法會進行輪訓處理:

          protected void resumePausedTasks() {

                  synchronized (this.lifecycleMonitor) {

                      if (!this.pausedTasks.isEmpty()) {//所有的線程一開始會被置為暫停

                          for (Iterator<?> it = this.pausedTasks.iterator(); it.hasNext();) {

                              Object task = it.next();

                              try {

                                  doRescheduleTask(task);

                                  it.remove();

                                  if (logger.isDebugEnabled()) {

                                      logger.debug("Resumed paused task: " + task);

                                  }

                              }

                              catch (RuntimeException ex) {

                                  logRejectedTask(task, ex);

                                  // Keep the task in paused mode...

                              }

                          }

                      }

                  }

          每個線程是一個 AsyncMessageListenerInvoker,通過它的run方法來進行消息處理:

          先執行:

              private boolean invokeListener() throws JMSException {

                      initResourcesIfNecessary();

                      boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);

                      this.lastMessageSucceeded = true;

                      return messageReceived;

                  }

          接著:

          Message message = receiveMessage(consumerToUse);

           

          這就是最終的阻塞方法去獲取消息

          /**

               * Receive a message from the given consumer.

               * @param consumer the MessageConsumer to use

               * @return the Message, or <code>null</code> if none

               * @throws JMSException if thrown by JMS methods

               */

              protected Message receiveMessage(MessageConsumer consumer) throws JMSException {

                  return (this.receiveTimeout < 0 ? consumer.receive() : consumer.receive(this.receiveTimeout));

              }

          4調用我們自定義的監聽器:

          其中監聽器可以實現SessionAwareMessageListenerMessageListener接口,不一樣的地方在于前者會把sessionmessage參數給你,你可以進行一個回調操作。后者只有消息內容message參數。

          protected void invokeListener(Session session, Message message) throws JMSException {

                  Object listener = getMessageListener();

                  if (listener instanceof SessionAwareMessageListener) {

                      doInvokeListener((SessionAwareMessageListener) listener, session, message);

                  }

                  else if (listener instanceof MessageListener) {

                      doInvokeListener((MessageListener) listener, message);

                  }

                  else if (listener != null) {

                      throw new IllegalArgumentException(

                              "Only MessageListener and SessionAwareMessageListener supported: " + listener);

                  }

                  else {

                      throw new IllegalStateException("No message listener specified - see property 'messageListener'");

                  }

              }

          主站蜘蛛池模板: 东海县| 土默特右旗| 收藏| 左贡县| 阿坝县| 灯塔市| 永善县| 新干县| 龙川县| 交城县| 灌云县| 勐海县| 黔东| 吉木萨尔县| 海城市| 天台县| 肇源县| 客服| 永善县| 哈尔滨市| 隆尧县| 黄平县| 浦江县| 顺义区| 郯城县| 都兰县| 德州市| 阿瓦提县| 宜春市| 隆德县| 安庆市| 韶山市| 金昌市| 松原市| 汉寿县| 新宁县| 广饶县| 崇义县| 泰和县| 吉林市| 泰兴市|