posts - 42,comments - 83,trackbacks - 0
                  前段時(shí)間,有同事跟我說客戶那邊有很多狀態(tài)為receive的message,這些message只有在JMS Server或weblogic Server充啟之后才能被消費(fèi)。經(jīng)過調(diào)查后,這個(gè)問題可能是weblogic的一個(gè)bug,當(dāng)然也不排除跟具體環(huán)境有關(guān)的可能。下面我們來看看問題的根本原因是什么,這種分析有助我們更進(jìn)一步理解weblogic JMS的實(shí)現(xiàn)。

                  首先我們看一下什么是receive,receive表示一個(gè)message已經(jīng)被consumer消費(fèi),但服務(wù)端還沒有關(guān)于這個(gè)message的ack,所以消息不能從queue中刪除, 由于queue中的消息是point-2-point的,所以某個(gè)消息被標(biāo)為receive后,這個(gè)消息自然不能被其他consumer消費(fèi)。那么這個(gè)ack由誰負(fù)責(zé)發(fā)送給Server呢,什么時(shí)候發(fā)送呢?這些都由我們創(chuàng)建JMS Session時(shí)使用的Ack_mode決定,典型的ack-mode有如下兩種:
            auto-ack: 自動(dòng)響應(yīng)模式,consumer.receive()調(diào)用后,如果服務(wù)器端發(fā)現(xiàn)有可用的message,消息返回到客戶端JMS實(shí)現(xiàn)層,在消息返回給客戶前,由weblogic client(JMSSession.getAsyncMessageForConsumer(),異步接受,比如MessageListener,或JMSSession.receiveMessage(),同步接受)層實(shí)現(xiàn)直接調(diào)用acknowledge()通知服務(wù)器端,服務(wù)器端收到ack后,它會(huì)負(fù)責(zé)負(fù)責(zé)將處于receive的message從物理queue中刪除。
                  client-ack: 客戶響應(yīng)模式,consumer.receive()調(diào)用后,客戶端收到消息后,客戶端程序決定什么時(shí)候發(fā)送ack,可以在消息后立即發(fā)送,也可以在消息處理成功后發(fā)送,ack的發(fā)送通過message.acknowledge()實(shí)現(xiàn)。后面的過程和auto-ack相同。

            初看這個(gè)問題,感覺是ack沒有收到,那么什么情況下會(huì)出現(xiàn)ack丟失呢?網(wǎng)絡(luò)問題? 那么客戶端或服務(wù)器端的server log應(yīng)該能夠看到異常,客戶堅(jiān)持說沒有任何異常。有點(diǎn)不可思議,要了客戶的代碼,他們沒有代碼,實(shí)際上他們的應(yīng)用是基于Spring Framework的,通過簡(jiǎn)單的配置來實(shí)現(xiàn)他們的業(yè)務(wù)需要,看了下Spring的相關(guān)代碼,客戶之所以說沒有異常,因?yàn)镾pring catch了服務(wù)器端返回的JMSException,并吃掉了這個(gè)異常(即異常沒有打印出來),這個(gè)異常輸出是可以通過Spring的配置來實(shí)現(xiàn)。客戶配置后,給了我具體的異常,如下:
          java.lang.IllegalArgumentException: Delay is negative.
                  at weblogic.timers.internal.TimerManagerImpl.schedule(TimerManagerImpl.java:388)
                  at weblogic.timers.internal.TimerManagerImpl.schedule(TimerManagerImpl.java:340)
                  at weblogic.messaging.kernel.internal.ReceiveRequestImpl.<init>(ReceiveRequestImp l.java:98)
                  at weblogic.messaging.kernel.internal.QueueImpl.receive(QueueImpl.java:820)
                  at weblogic.jms.backend.BEConsumerImpl.blockingReceiveStart(BEConsumerImpl.java:1 172)
                  at weblogic.jms.backend.BEConsumerImpl.receive(BEConsumerImpl.java:1383)
                  at weblogic.jms.backend.BEConsumerImpl.invoke(BEConsumerImpl.java:1088)
                  at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:7 59)
                  at weblogic.messaging.dispatcher.DispatcherImpl.dispatchAsyncInternal(DispatcherI mpl.java:129)
                  at weblogic.messaging.dispatcher.DispatcherImpl.dispatchAsync(DispatcherImpl.java :112)
                  at weblogic.messaging.dispatcher.Request.dispatchAsync(Request.java:1046)
                  at weblogic.jms.dispatcher.Request.dispatchAsync(Request.java:72)
                  at weblogic.jms.frontend.FEConsumer.receive(FEConsumer.java:557)
                  at weblogic.jms.frontend.FEConsumer.invoke(FEConsumer.java:806)
                  at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:7 59)
                  at weblogic.messaging.dispatcher.DispatcherServerRef.invoke(DispatcherServerRef.j ava:276)
                  at weblogic.messaging.dispatcher.DispatcherServerRef.handleRequest(DispatcherServ erRef.java:141)
                  at weblogic.messaging.dispatcher.DispatcherServerRef.access$000(DispatcherServerR ef.java:36)
                  at weblogic.messaging.dispatcher.DispatcherServerRef$2.run(DispatcherServerRef.ja va:112)
                  at weblogic.work.ExecuteThread.execute(ExecuteThread.java:209)
                  at weblogic.work.ExecuteThread.run(ExecuteThread.java:181)

            現(xiàn)在我們看一下Weblogic JMS的receive的基本流程,看看這個(gè)exception為什么會(huì)被拋出來。
            JMSConsumer.receive(long timewait),客戶端發(fā)起receive請(qǐng)求,其中timewait可有可無,不做指定的話,說明沒有可用消息到達(dá)的話,我們會(huì)一直等下去。如要不作等待的話,可以使用receiveNoWait()。receive()中會(huì)檢查timeout值,如果沒有指定timeout,那么Long.maxValue會(huì)被設(shè)定成這個(gè)timeout,如果timeout小于0,客戶端將會(huì)收到Invalid Timeout異常,接下來請(qǐng)求會(huì)被delegate到JMSSession。
                  |
                  JMSSession.receiveMessage(consumer,timeout),這里timeout會(huì)被重新計(jì)算,然后我們會(huì)創(chuàng)建一個(gè)FEConsumerReceiveRequest對(duì)象。這個(gè)對(duì)象中包含計(jì)算后的timeout,計(jì)算后的timeout應(yīng)該是個(gè)非負(fù)值(上面的異常就是這里的計(jì)算導(dǎo)致的,至于為什么客戶指定的timeout為1,計(jì)算后的timeout變成了負(fù)數(shù),從而導(dǎo)致上面的異常,從代碼層面,看不出有什么問題)。FEConsumerReceiveRequest對(duì)象創(chuàng)建后,由JMS FrontEnd Dispatcher負(fù)責(zé)把請(qǐng)求交給后端的JMS Server,Dispatcher是Weblogic JMS中用于負(fù)責(zé)請(qǐng)求傳輸?shù)模蕾囉赗JVM layer,這里不做贅述。
                  |     
                  RJVM layer, 負(fù)責(zé)RMI socket層的數(shù)據(jù)發(fā)送   
                  |        
                  FEConsumer.receive(invocableRequest),RJVM層處理完socket數(shù)據(jù)后,請(qǐng)求會(huì)被轉(zhuǎn)給JMSConsumer,JMSConsumer通過狀態(tài)機(jī)(state machine)來控制請(qǐng)求處理,沒有過多的邏輯,它會(huì)基于收到的receive request創(chuàng)建一個(gè)BEConsumerReceiveRequest對(duì)象,然后把這個(gè)請(qǐng)求通過JMS BackEnd Dispatcher轉(zhuǎn)發(fā)給BEConsumerImpl。之所以存在FrontEnd /BackEnd Dispatcher,主要考慮到處理請(qǐng)求的server和queue所在的不是同一server。
                  |
                  BEConsumerImpl.receive(request),request進(jìn)入BEConsumerImpl后,它也通過state machine來控制請(qǐng)求處理,下面兩個(gè)方法在調(diào)用過程中被順序調(diào)用,
                  BEConsumerImpl.blockingReceiveStart(request),這里首先檢查timeout值,然后調(diào)用QueueImpl.receive(...)從queue中獲取message,receive()的具體參數(shù)如下,包括timeout, expression(即檢查條件,我們定義的message selector就在其中)。
                  BEConsumerImpl.blockingReceiveProcessMessage(request)
                  BEConsumerImpl.blockingReceiveComplete(request)
                  |
                  QueueImpl.receive(expression,count,acknowledge,owner,timeout,started,userBlob),這里除了狀態(tài)檢查,沒有其他邏輯,它會(huì)根據(jù)傳進(jìn)來的參數(shù),初始化一個(gè)ReceiveRequestImpl對(duì)象。
                  |
                  ReceiveRequestImpl.new(),這個(gè)new代表ReceiveRequestImpl的構(gòu)造函數(shù)。
                  |
                  QueueImpl.get(...),如果timeout = 0,即如果客戶調(diào)用的是receiveNoWait的話,我們直接去通過QueueImpl.get(...),如果沒有match的message,那么直接將新建request的result設(shè)定為no result,否則將match的message設(shè)定為result。
                  QueueImpl.addReader(receiveRequestImpl),如果timeout != 0,我們會(huì)在ReceiveRequestImpl.start()中調(diào)用QueueImpl.addReader(),addReader()中同樣會(huì)通過QueueImpl.get()檢查是否有match的message,如果找到相應(yīng)的message,我們會(huì)把message reference狀態(tài)改為receive。
                  TimerManagerImpl.schedule(timeout),如果QueueImpl.addReader()中的QueueImpl.get()沒有找到相應(yīng)的message,我們需要等待(依據(jù)客戶指定的timeout),這個(gè)等待通過timer去實(shí)現(xiàn),如下:
                          timer = timerManager.schedule(this, timeout);
            指定的timeout到達(dá)后,如果和沒有可用的message,no result將被返回。從上面的異常堆棧來看,問題就出在這里,如果timeout為負(fù)數(shù),timerMangerImpl在啟動(dòng)trigger的時(shí)候,會(huì)拋出如下的runtimeException,
              java.lang.IllegalArgumentException: Delay is negative.
                  
            也許你會(huì)疑問,這沒什么問題吧,timerTrigger只有在沒有message的時(shí)候才會(huì)被schedule,既然沒有message,那有談何狀態(tài)receive message?沒錯(cuò),起timerTrigger之前我們的確沒有修改message狀態(tài),但你注意到?jīng)]有,我們?cè)谄餿imerTrigger前,把receiveRequestImpl加入到QueueImpl去了,但我們?cè)谂龅絀llegalArgumentException時(shí)并沒有把這個(gè)receiveRequestImpl從QueueImpl中刪除,問題就在這里。

           1   synchronized void addReader(Reader reader) throws KernelException {
           2             
           3     List list = get(..);
           4     int newCount;
           5     if (list != null) {
           6             
           7     } else {
           8       reader.incrementReserveCount(-reservedCount);
           9       newCount = reader.getCount();
          10     }
          11     if (newCount > 0) {
          12       logger.debug("Adding consumer to reader list");
          13       readerList.add(reader);
          14     }
          15   }

            如果我們不把receiveRequestImpl從QueueImpl的readerList中刪除,那么如果過一會(huì)有message sender發(fā)送一條和我們上述請(qǐng)求match的message到這個(gè)queue。weblogic收到這個(gè)message后,它會(huì)檢查readerList,如果這個(gè)message match某個(gè)reader,我們會(huì)把message狀態(tài)改成receive,當(dāng)由于IllegalArgumentException,客戶端收到它的時(shí)候,客戶端會(huì)close JMSSession,也就是說這個(gè)消息雖然有reader,但無法deliver到客戶端。

            我們?cè)賮砜纯碬eblogic JMS sender的相關(guān)流程,
            QueueImpl.messageSendComplete(),消息發(fā)送過程結(jié)束后(比如涉及store的話,消息此時(shí)已經(jīng)被存儲(chǔ)),到這一步的話,我們會(huì)調(diào)整系統(tǒng)接受的消息數(shù),然后通過makeMessageAvailable()把消息標(biāo)成visiable或deliver給正在等待的reader。
                  QueueImpl.makeMessageAvailable(),它會(huì)直接調(diào)用match()去檢查readerList中是否存在正在等待它的reader。
                  QueueImpl.match(),它通過finderReader()從readerList中檢查reader,如果有符合條件的reader,它會(huì)把這個(gè)message標(biāo)志為receive,同時(shí)把這個(gè)message挪到pending list中去。

            前面我們說了,雖然reader還在,但與之對(duì)應(yīng)的JMSConsumer已經(jīng)被close,所以這個(gè)消息根本就無法deliver出去,自然就不會(huì)有ack從客戶端返回了,這個(gè)消息也只能一直pending了。

            這個(gè)問題可能是個(gè)bug,目前還在確認(rèn)之中,但我同時(shí)也在和客戶溝通,可能跟他的環(huán)境有一定關(guān)系(比如NTP時(shí)間同步問題)。

            雖然這個(gè)問題能引發(fā)message pending,但并不是所有的message pending問題都是由它應(yīng)起的。網(wǎng)絡(luò)問題也能引發(fā)類似問題,具體問題具體分析,主要的參考客戶端的JMSException。位于receive后的狀態(tài)是transation,也就是如果發(fā)現(xiàn)狀態(tài)為transaction的message的話,一般而言,是這個(gè)消息要么就是發(fā)送還沒結(jié)束,要么就是消息正處于一個(gè)delete的事務(wù)單元中,這里就不再一一羅列了。

          posted on 2009-06-17 09:07 走走停停又三年 閱讀(3886) 評(píng)論(9)  編輯  收藏 所屬分類: Weblogic

          FeedBack:
          # re: 關(guān)于JMS Message Pending的問題
          2009-06-16 20:46 | sunnycare
          最近項(xiàng)目中正好用到了JMS。
          看了你的blog,有兩個(gè)問題問下:
          1.如何重現(xiàn)這個(gè)問題?
          2.既然沒有patch出來,那么有什么別的方式避免這個(gè)問題?  回復(fù)  更多評(píng)論
            
          # re: 關(guān)于JMS Message Pending的問題
          2009-06-16 21:16 | 走走停停又三年
          這個(gè)問題基本很難重現(xiàn),原因很可能跟系統(tǒng)環(huán)境有關(guān)系。weblogic在JMSSession中計(jì)算timeout的時(shí)候,參考了System.currentTimeMills(),如果系統(tǒng)起了NTP client定期做時(shí)間同步的話,可能會(huì)在計(jì)算的時(shí)候引起負(fù)值。如果真跟系統(tǒng)時(shí)間有關(guān),那么最好的做法就是保證客戶端運(yùn)行期間,不要做系統(tǒng)時(shí)間同步。

          另外一個(gè)客戶端回避的方法就是客戶端使用receiveNoWait()來代替receive()或receive(long timeout)。  回復(fù)  更多評(píng)論
            
          # re: 關(guān)于JMS Message Pending的問題
          2009-06-17 18:26 | sunnycare
          # re: 關(guān)于JMS Message Pending的問題
          2009-06-19 12:33 | 找個(gè)美女做老婆
          JBOSS 和 SPRING 的JMS 用過,WEBLOGIC的還沒有用過

          Java樂園 技術(shù)交流社區(qū):http://www.javaly.cn
          Java樂園 群號(hào):15651281
          驗(yàn)證消息 : Java樂園  回復(fù)  更多評(píng)論
            
          # re: 關(guān)于JMS Message Pending的問題
          2009-08-03 11:22 | ybb
          我這里碰到一個(gè)奇詭的問題:

          環(huán)境 jdk1.6, weblogic 10.3

          配置了一個(gè) TestQueue。

          當(dāng)我們使用 MDB方式接收Queue消息時(shí),如果拋出運(yùn)行時(shí)異常,這個(gè)消息會(huì)自動(dòng)重發(fā)。

          如果客戶端是采用consumer messageListener 的形式接收,當(dāng) listener.onMessage() 拋出運(yùn)行時(shí)異常時(shí),這個(gè)消息的status String 一直是 receive 狀態(tài),只有重啟客戶端才會(huì)重新收到一次。
          代碼如下:
          QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
          MessageConsumer receiver = session.createConsumer(sq);
          receiver.setMessageListener(new Receiver());

          難道m(xù)essageListener形式不能像MDB一樣的自動(dòng)重發(fā)?

          找了很多資料,一點(diǎn)思路都沒了。


            回復(fù)  更多評(píng)論
            
          # re: 關(guān)于JMS Message Pending的問題
          2009-08-03 11:55 | 走走停停又三年
          什么樣的runtime exception呢? 有可能是你的acknowledege可能由于網(wǎng)絡(luò)問題沒有發(fā)過去,導(dǎo)致message處于receive,所以不會(huì)重新發(fā)送。建議你用client_acknowledge,等你消息成功處理后直接調(diào)用message.acknowledge()。  回復(fù)  更多評(píng)論
            
          # re: 關(guān)于JMS Message Pending的問題
          2009-08-03 13:26 | ybb
          運(yùn)行時(shí)異常,比如拋出空指針異常。
          我的網(wǎng)絡(luò)環(huán)境是局域網(wǎng),acknowledege 肯定能收到的。
          所有在客戶端onMessage中拋出運(yùn)行時(shí)異常的message 都是 receive狀態(tài)。

          但在ejb MDB中,如果出現(xiàn)運(yùn)行時(shí)異常,這消息會(huì)等待10、20秒后重發(fā)。

          如果我自行處理 acknowledege,如何讓message自動(dòng)重發(fā)呢?

            回復(fù)  更多評(píng)論
            
          # re: 關(guān)于JMS Message Pending的問題
          2009-08-03 13:30 | ybb
          能否 交流一下?

          我的msn: ybbkd2@hotmail.com , qq:11898620

          期待  回復(fù)  更多評(píng)論
            
          # re: 關(guān)于JMS Message Pending的問題
          2009-09-02 10:00 | 走走停停又三年
          對(duì)于系統(tǒng)時(shí)間問題,我們可以用下面的小程序測(cè)試一下,

          public class JVMTimeTest {

          public static void main(String args[]){
          JVMTimeTest test = new JVMTimeTest();
          test.retriveTime();
          }

          private void retriveTime(){
          long previousTime = -1;
          while(true){
          long currentTime = System.currentTimeMillis();
          System.out.println("currentTime is: " + currentTime);
          if(previousTime > currentTime)
          System.out.println("OS time change is detected! and:" +
          "previousTime: " + previousTime + " " +
          "currentTime: " + currentTime);
          previousTime = currentTime;
          try{
          Thread.currentThread().sleep(100);
          }catch(Exception e){}
          }
          }
          }
            回復(fù)  更多評(píng)論
            
          主站蜘蛛池模板: 洞头县| 安阳县| 嘉祥县| 盘锦市| 承德市| 古浪县| 乐陵市| 西充县| 阿拉善盟| 神木县| 铜梁县| 都兰县| 乌兰县| 杭锦后旗| 灌阳县| 镇坪县| 尼木县| 民勤县| 涟水县| 铁岭市| 阳高县| 黄大仙区| 都江堰市| 施秉县| 蓬莱市| 呼图壁县| 江津市| 隆化县| 江门市| 车险| 永安市| 营口市| 大冶市| 双辽市| 宣恩县| 霍城县| 涿鹿县| 四子王旗| 广丰县| 邵阳县| 彩票|