往事如風(fēng)
          記錄工作中的點點滴滴 留住那些淡淡的回憶
          posts - 6,  comments - 3,  trackbacks - 0

          ActiveMQ是一個流行的開源MQ,我們也大規(guī)模應(yīng)用在網(wǎng)站的方方面面,每天處理上億消息,取得了較好效果。ActiveMQ有一個很好很強大的插件體系,提供了很強的擴(kuò)展能力,ActiveMQ本身就是使用這一套插件體系實現(xiàn)了很多擴(kuò)展功能,包括他的權(quán)限管理,日志管理,事務(wù)等模塊都是作為一個插件集成的,我們自己也在消息路由、補償式事務(wù)方面使用了它的插件功能,確實非常方便。

          在ActiveMQ中,Broker代表一個運行的MQ節(jié)點,ActiveMQ的插件實際上是基于Broker的一個Filter鏈,整個設(shè)計類似于servlet的Filter結(jié)構(gòu),所有的Plugin構(gòu)成一個鏈?zhǔn)浇Y(jié)構(gòu),每個插件實際上都是一個"Interceptor",類結(jié)構(gòu)圖如下:

          Main.jpg

          其中Broker接口封裝了一個AMQ節(jié)點的方方面面的方法,包括連接管理、session管理、消息的發(fā)送和接收以及其它的一些功能,BrokerFilter實現(xiàn)這個接口,并提供了鏈?zhǔn)浇Y(jié)構(gòu)支持,可以攔截所有Broker方法的實現(xiàn)并傳遞結(jié)果給鏈?zhǔn)浇Y(jié)構(gòu)的下一個,形成了一個完整的"職責(zé)鏈"模式,具體層次關(guān)系如下,其中,"System Plugin"是指AMQ內(nèi)部使用Plugin機(jī)制實現(xiàn)的一些系統(tǒng)功能,用戶不能定制,"AMQ Plugin"指的是ActiveMQ已經(jīng)實現(xiàn)好了,可以在配置文件中自由選擇的一些插件,例如簡單的安全插件,JAAS安全插件和DLQ插件等等,用戶插件就是指用戶自己實現(xiàn)的amq插件,需要用戶把相關(guān)jar包放入到amq的啟動classpath中,并在配置文件中進(jìn)行配置才能正確加載的插件。

          圖片1.jpg

          在上面這個層次結(jié)構(gòu)中,最下面的RegionBroker是核心組件,在其之上的都是Broker的插件,繼承之于BrokerFilter,和Broker保持接口兼容但是擴(kuò)展Broker的功能。

          下面舉一個簡單的例子,具體說明一下AMQ的插件是如何工作的。

          我們在使用AMQ的過程中發(fā)現(xiàn),在測試環(huán)境維護(hù)方面有很大的麻煩,具體表現(xiàn)在很多同學(xué)在測試項目的時候往往只關(guān)注自己項目牽涉的隊列,不會去消費其他"不相關(guān)"的隊列,這樣導(dǎo)致的一個問題就是ActiveMQ經(jīng)常發(fā)生大量數(shù)據(jù)阻塞,導(dǎo)致測試環(huán)境不可用,影響相關(guān)項目的測試工作。為了避免這個問題,我們假定在測試環(huán)境可以定義以下一些限制條件:

          1、 所有隊列堆積消息不超過1000條,超過之后立即清除。

          2、 消息超過1個小時沒有消費,就直接過期。

          我們可以編寫一個簡單的amq插件來完成這兩個限制條件:

          首先,編寫一個插件安裝類:

          package com.alibaba.napoli.plugins;

          import org.apache.activemq.broker.Broker;
          import org.apache.activemq.broker.BrokerPlugin;
          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;

          public class MessageControlBrokerPlugin implements BrokerPlugin {
          private static Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);

          public Broker installPlugin(Broker broker) throws Exception {
          log.info("install MessageControlBrokerPlugin");
          return new MessageControlBroker(broker);
          }
          }

          其次,編寫真正的插件實現(xiàn):

          package com.alibaba.napoli.plugins;

          import java.io.IOException;

          import org.apache.activemq.broker.Broker;
          import org.apache.activemq.broker.BrokerFilter;
          import org.apache.activemq.broker.ConnectionContext;
          import org.apache.activemq.broker.ProducerBrokerExchange;
          import org.apache.activemq.broker.region.Destination;
          import org.apache.activemq.broker.region.MessageReference;
          import org.apache.activemq.broker.region.Queue;
          import org.apache.activemq.command.Message;
          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;

          /**
          * 開發(fā)環(huán)境管理插件,符合兩個條件進(jìn)行消息清理:<br>
          * 1 消息累積超過1000條
          * 2 消息超過1個小時無人消費
          * @author guolin.zhuanggl
          *
          */
          public class MessageControlBroker extends BrokerFilter {
          public static Log log = LogFactory.getLog(DiscardingDLQBroker.class);
          private static final long DEFAULT_EXPIRATION = 3600*1000;
          private static final long DEFAULT_PURGE_COUNT = 1000;

          public MessageControlBroker(Broker next) {
          super(next);
          }

          @Override
          public void messageExpired(ConnectionContext context,
          MessageReference message) {

          Message msg = null;
          try {
          msg = message.getMessage();
          } catch (IOException e) {
          log.error("failed to fetch content: ",e);
          }
          purgeMessage(msg);
          // TODO Auto-generated method stub
          super.messageExpired(context, message);
          }

          /**
          * 清除隊列中的所有消息
          */
          private void purgeMessage(Message message){
          Destination r = message.getRegionDestination();
          if(r instanceof Queue){
          try {
          //如果累積消息超過1000個,清除隊列消息
          if(((Queue) r).getMessages().size() > DEFAULT_PURGE_COUNT){
          ((Queue) r).purge();
          }
          } catch (Exception e) {
          // TODO Auto-generated catch block
          log.error("failed to purge queue "+r.getName(),e);
          }
          }

          }
          /**
          * 當(dāng)消息發(fā)送時,全部設(shè)置過期時間1個小時,測試環(huán)境專用!!!
          */
          @Override
          public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception {
          long oldExp = messageSend.getExpiration();
          messageSend.setExpiration(oldExp < DEFAULT_EXPIRATION && oldExp > 0 ? oldExp : DEFAULT_EXPIRATION );
          purgeMessage(messageSend);
          super.send(producerExchange, messageSend);
          }

          }

          然后,將這兩個類打包為myplugin.jar,并放在activemq啟動目錄下的lib目錄下

          最后,在activemq.xml文件中增加一個簡單的spring配置項:

          <bean xmlns=" id="purgePlugin"
          class="com.alibaba.napoli.plugins.MessageControlBrokerPlugin">
          </bean>

          然后,重啟activemq,就會發(fā)現(xiàn)這個插件已經(jīng)被加載。


          FeedBack:
          # re: ActiveMQ的插件開發(fā)介紹[未登錄]
          2010-07-21 16:04 |
          你這個例子測試過嗎  回復(fù)  更多評論
            
          # re: ActiveMQ的插件開發(fā)介紹[未登錄]
          2010-07-21 16:05 |
          if(r instanceof Queue)
          r 不是這個實例,程序有問題吧
            回復(fù)  更多評論
            
          # re: ActiveMQ的插件開發(fā)介紹[未登錄]
          2010-07-21 16:18 |
          這個r 是空值,這個方法不行,樓主在仔細(xì)看看  回復(fù)  更多評論
            

          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           

          <2010年7月>
          27282930123
          45678910
          11121314151617
          18192021222324
          25262728293031
          1234567

          常用鏈接

          留言簿

          隨筆分類

          隨筆檔案

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 莒南县| 五原县| 禹城市| 绿春县| 双流县| 七台河市| 辽中县| 调兵山市| 东莞市| 泉州市| 石泉县| 黑龙江省| 家居| 芜湖县| 吉首市| 文山县| 嘉黎县| 阿克陶县| 东台市| 盐池县| 宜川县| 静宁县| 健康| 兴仁县| 天等县| 龙山县| 荥阳市| 宁德市| 阿城市| 遵义县| 宁化县| 西盟| 湘乡市| 汝阳县| 和平县| 平泉县| 石城县| 华容县| 丽江市| 乡城县| 故城县|