pzxsheng

          有種相見不敢見的傷痛,有種愛還埋藏在心中

          activeMQ指南針_Queue完整分析

          原文地址:http://www.360doc.com/content/13/0315/16/11765546_271697514.shtml

          在接觸activeMQ的這一段時(shí)間里,我們還是保持開始對(duì)它的態(tài)度,它是個(gè)優(yōu)秀的開源消息中間件。消息中間件是個(gè)非常重要的搭建企業(yè)應(yīng)用系統(tǒng)的重要組件,我們?cè)诓粩嗌钊敕治?span style="font-family: Courier; font-size: 12pt;">activeMQ的過程中,發(fā)現(xiàn)直到5.1這個(gè)版本,都還是存在不少問題,有些是很致命,但正因?yàn)槿绱耍覀兏訄?jiān)定了要全面掌握activeMQ,我們不想重新做“輪子”,但我們要具備在輪子壞了或不好用的情況下,要能獨(dú)立解決碰到的這些問題。下面我們通過分析網(wǎng)友提出的一個(gè)典型的問題場(chǎng)景,來作為我們指南針計(jì)劃的結(jié)束。

          Queue作為activeMQ里面一個(gè)很重要的通訊方式,網(wǎng)友的場(chǎng)景如下:

          測(cè)試queue持久化消息時(shí),發(fā)送接收20W條消息。打開消息消費(fèi)者,連上再斷開,反復(fù)進(jìn)行這步操作,能接收到消息,接收端有時(shí)候會(huì)阻塞,但不能完全接收完20W條消息。(其實(shí)5000條就會(huì)發(fā)生問題,不用20W這么多)

                 相關(guān)背景知識(shí):

                 因?yàn)檫@是5.1版本的一個(gè)非常嚴(yán)重的bug,所以我們會(huì)比較詳細(xì)的進(jìn)行分析。(我們?cè)谧罱K解決問題后,上activeMQ官網(wǎng)上發(fā)現(xiàn)它最新的源碼是解決了該問題的,但這并不影響這個(gè)問題的典型性)。下面我們將從3個(gè)方面來分析:Queue消息的接收和發(fā)送、內(nèi)存使用機(jī)制、消息的審查(audit)、消息在文件中的存儲(chǔ)機(jī)制。

          l         Queue消息的接收和發(fā)送



           

          Queue接收消息并發(fā)給需要的消費(fèi)者,具體過程如下:

          1.  Queue從消息生產(chǎn)者接收消息。

          2.  Queue使用一個(gè)“存儲(chǔ)指針”來接收這些消息。當(dāng)內(nèi)存有空閑區(qū)域時(shí),“存儲(chǔ)指針”把消息放到內(nèi)存中,當(dāng)內(nèi)存不夠時(shí),則把消息們存入磁盤文件。

          3.  當(dāng)有活動(dòng)的(active)的消息消費(fèi)者時(shí),Queue會(huì)首先把“存儲(chǔ)指針”的內(nèi)存中的消息送給消費(fèi)者,當(dāng)內(nèi)存的消息被消費(fèi)掉,則從磁盤文件中再讀入其他的消息(出問題處),直至消息都被消費(fèi)掉了。

          其中最關(guān)鍵的方法是Queue類里的doPageIn()

           

          l         內(nèi)存使用機(jī)制

          activeMQ為了適應(yīng)企業(yè)級(jí)的365*24的使用,在內(nèi)存使用方面非常慎重,任何消息只有在內(nèi)存里有空閑區(qū)域時(shí),才能放到內(nèi)存里,之后才能發(fā)給消費(fèi)者。當(dāng)消息被消費(fèi)者消耗掉了后,確認(rèn)信息會(huì)發(fā)給activeMQQueue接收到這些確認(rèn)消息后,會(huì)把那些被確認(rèn)的消息所占用的內(nèi)存釋放掉。

           

          l         消息的審查(audit)

          為了防止消息的重復(fù)發(fā)送,activeMQ采用了一個(gè)審查機(jī)制,它負(fù)責(zé)審查某條消息是否重復(fù)。它是一個(gè)最近最久未使用算法(LRU)隊(duì)列。每個(gè)隊(duì)列元素它是一個(gè)bit數(shù)組,它的運(yùn)行機(jī)制如下所示:



           

           

                 消息是一個(gè)個(gè)按照順序進(jìn)入bit數(shù)組,具體算法answer = (index - firstIndex) / BitArray.LONG_SIZE,其中:

          BitArray.LONG_SIZE是每個(gè)bit數(shù)組的大小。

          Index是消息的編號(hào)。(它是按照+1順序增加的)

          firstIndex是整個(gè)LRU隊(duì)列的首Index,這個(gè)值會(huì)經(jīng)常變化,因?yàn)楫?dāng)達(dá)到LRU的上限時(shí),老的一批就被清除了,firstIndex += BitArray.LONG_SIZE(出問題處)

           

          l         消息在文件中的存儲(chǔ)機(jī)制

          存放在文件中的消息,它們是按照如下方式進(jìn)行組織的:



           

          每個(gè)消息都知道它的上一個(gè)和下一個(gè)消息,當(dāng)它自身被刪除后,相應(yīng)的關(guān)系會(huì)進(jìn)行調(diào)整。

           

          問題原因分析:

              因?yàn)?/span>activeMQ在編碼實(shí)現(xiàn)的時(shí)候,原本的想法應(yīng)該是這樣的:

          1.  從生產(chǎn)者接收消息,如果Queue有可用的內(nèi)存就放在內(nèi)存中,沒有則存入文件中。

          2.  Queue發(fā)送消息給消費(fèi)者時(shí),先發(fā)送已經(jīng)保存在內(nèi)存中的消息。

          3.  當(dāng)內(nèi)存中消息發(fā)送完后,順序讀入(這里是關(guān)鍵)文件中的消息,通過消息的審查機(jī)制,確認(rèn)不是重復(fù)消息,則放入內(nèi)存中供后續(xù)操作使用。

          但是activeMQ5.1版本的實(shí)現(xiàn),問題就出在第三步的順序讀入。因?yàn)閺奈募凶x入它有個(gè)先決條件,那就是必須要有可用的內(nèi)存,如果沒有可用的話,就放棄本次消息讀入,并且應(yīng)該放棄這次讀取操作。但是5.1版本是繼續(xù)往下讀,這就導(dǎo)致順序錯(cuò)亂,使得當(dāng)內(nèi)存可用的時(shí)候,讀入的消息在進(jìn)行審查的時(shí)候,發(fā)生錯(cuò)誤,錯(cuò)誤認(rèn)為它們是重復(fù)消息。這就導(dǎo)致發(fā)送20W條消息,不能保證完全收到。

           

          解決方案:

          KahaReferenceStore的方法recoverNextMessages里的

          if (entry != null) {

                          int count = 0;

                          do {

                              ReferenceRecord msg = messageContainer.getValue(entry);

                              if (msg != null ) {

                                  if ( recoverReference(listener, msg)) {

                                      count++;

                                      lastBatchId = msg.getMessageId();

                                  }

                              } else {

                                  lastBatchId = null;

                              }

                              batchEntry = entry;

                              entry = messageContainer.getNext(entry);

                          } while (entry != null && count < maxReturned && listener.hasSpace());

                      }

           

          改為

                   if (entry != null) {

                          int count = 0;

                          do {

                              ReferenceRecord msg = messageContainer.getValue(entry);

                              testTheNextMsgId(msg.getMessageId().toString());

                              if (msg != null )

                              {

                                  if ( recoverReference(listener, msg))

                                  {

                                      count++;

                                      lastBatchId = msg.getMessageId();

                                      batchEntry = entry;

                                      entry = messageContainer.getNext(entry);

                                  }

                                  else

                                  {

                                     break;

                                  }

                              }

                              else

                              {

                                  lastBatchId = null;

                                  batchEntry = entry;

                                  entry = messageContainer.getNext(entry);

                              }

                          } while (entry != null && count < maxReturned && listener.hasSpace());

                      }

           

           

          activeMQ指南針計(jì)劃的結(jié)束,但它又是個(gè)新開始,我們通過這個(gè)計(jì)劃收獲了我們想要的東西了,同時(shí)我們不僅為各位朋友答疑解疑,也提供了activemqSpanner這個(gè)工具作為消息網(wǎng)絡(luò)拓?fù)鋱D工具。再一次感謝各位朋友對(duì)我們的信任。

                  現(xiàn)在,我們正式啟動(dòng)activeMQ笑臉計(jì)劃。它的目的不再是給大家提供解決問題的方向,而是直接解決大家碰到的各種問題,給大家?guī)バδ槨K鼘⑹且粋€(gè)長期堅(jiān)持的事情,任何關(guān)于activeMQ使用過程的疑惑、問題、bug、功能改進(jìn),都可以在這個(gè)計(jì)劃里交流。所有在笑臉計(jì)劃中提出的問題、功能改進(jìn)、解決方案,都將完全通過網(wǎng)絡(luò)無償分享給所有人。

          posted on 2013-03-15 16:24 科菱財(cái)神 閱讀(707) 評(píng)論(0)  編輯  收藏 所屬分類: ActiveMQ

          導(dǎo)航

          <2013年3月>
          242526272812
          3456789
          10111213141516
          17181920212223
          24252627282930
          31123456

          統(tǒng)計(jì)

          常用鏈接

          留言簿(1)

          隨筆分類

          隨筆檔案

          搜索

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          主站蜘蛛池模板: 二连浩特市| 阿瓦提县| 烟台市| 合江县| 黔东| 靖安县| 静宁县| 厦门市| 通榆县| 绥江县| 阿荣旗| 荥阳市| 基隆市| 莒南县| 和静县| 长子县| 洞头县| 永安市| 鄂托克前旗| 上虞市| 兴和县| 林西县| 蛟河市| 海淀区| 隆尧县| 南汇区| 镇巴县| 德江县| 望奎县| 潞西市| 泗水县| 怀宁县| 梓潼县| 上饶市| 黔西| 河间市| 诏安县| 安仁县| 武山县| 海丰县| 安福县|