往事如風(fēng)
          記錄工作中的點(diǎn)點(diǎn)滴滴 留住那些淡淡的回憶
          posts - 6,  comments - 3,  trackbacks - 0

          ActiveMQ是一個(gè)流行的開(kāi)源MQ,我們也大規(guī)模應(yīng)用在網(wǎng)站的方方面面,每天處理上億消息,取得了較好效果。ActiveMQ有一個(gè)很好很強(qiáng)大的插件體系,提供了很強(qiáng)的擴(kuò)展能力,ActiveMQ本身就是使用這一套插件體系實(shí)現(xiàn)了很多擴(kuò)展功能,包括他的權(quán)限管理,日志管理,事務(wù)等模塊都是作為一個(gè)插件集成的,我們自己也在消息路由、補(bǔ)償式事務(wù)方面使用了它的插件功能,確實(shí)非常方便。

          在ActiveMQ中,Broker代表一個(gè)運(yùn)行的MQ節(jié)點(diǎn),ActiveMQ的插件實(shí)際上是基于Broker的一個(gè)Filter鏈,整個(gè)設(shè)計(jì)類(lèi)似于servlet的Filter結(jié)構(gòu),所有的Plugin構(gòu)成一個(gè)鏈?zhǔn)浇Y(jié)構(gòu),每個(gè)插件實(shí)際上都是一個(gè)"Interceptor",類(lèi)結(jié)構(gòu)圖如下:

          Main.jpg

          其中Broker接口封裝了一個(gè)AMQ節(jié)點(diǎn)的方方面面的方法,包括連接管理、session管理、消息的發(fā)送和接收以及其它的一些功能,BrokerFilter實(shí)現(xiàn)這個(gè)接口,并提供了鏈?zhǔn)浇Y(jié)構(gòu)支持,可以攔截所有Broker方法的實(shí)現(xiàn)并傳遞結(jié)果給鏈?zhǔn)浇Y(jié)構(gòu)的下一個(gè),形成了一個(gè)完整的"職責(zé)鏈"模式,具體層次關(guān)系如下,其中,"System Plugin"是指AMQ內(nèi)部使用Plugin機(jī)制實(shí)現(xiàn)的一些系統(tǒng)功能,用戶不能定制,"AMQ Plugin"指的是ActiveMQ已經(jīng)實(shí)現(xiàn)好了,可以在配置文件中自由選擇的一些插件,例如簡(jiǎn)單的安全插件,JAAS安全插件和DLQ插件等等,用戶插件就是指用戶自己實(shí)現(xiàn)的amq插件,需要用戶把相關(guān)jar包放入到amq的啟動(dòng)classpath中,并在配置文件中進(jìn)行配置才能正確加載的插件。

          圖片1.jpg

          在上面這個(gè)層次結(jié)構(gòu)中,最下面的RegionBroker是核心組件,在其之上的都是Broker的插件,繼承之于BrokerFilter,和Broker保持接口兼容但是擴(kuò)展Broker的功能。

          下面舉一個(gè)簡(jiǎn)單的例子,具體說(shuō)明一下AMQ的插件是如何工作的。

          我們?cè)谑褂肁MQ的過(guò)程中發(fā)現(xiàn),在測(cè)試環(huán)境維護(hù)方面有很大的麻煩,具體表現(xiàn)在很多同學(xué)在測(cè)試項(xiàng)目的時(shí)候往往只關(guān)注自己項(xiàng)目牽涉的隊(duì)列,不會(huì)去消費(fèi)其他"不相關(guān)"的隊(duì)列,這樣導(dǎo)致的一個(gè)問(wèn)題就是ActiveMQ經(jīng)常發(fā)生大量數(shù)據(jù)阻塞,導(dǎo)致測(cè)試環(huán)境不可用,影響相關(guān)項(xiàng)目的測(cè)試工作。為了避免這個(gè)問(wèn)題,我們假定在測(cè)試環(huán)境可以定義以下一些限制條件:

          1、 所有隊(duì)列堆積消息不超過(guò)1000條,超過(guò)之后立即清除。

          2、 消息超過(guò)1個(gè)小時(shí)沒(méi)有消費(fèi),就直接過(guò)期。

          我們可以編寫(xiě)一個(gè)簡(jiǎn)單的amq插件來(lái)完成這兩個(gè)限制條件:

          首先,編寫(xiě)一個(gè)插件安裝類(lèi):

          package com.alibaba.napoli.plugins;

          import org.apache.activemq.broker.Broker;
          import org.apache.activemq.broker.BrokerPlugin;
          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;

          public class MessageControlBrokerPlugin implements BrokerPlugin {
          private static Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);

          public Broker installPlugin(Broker broker) throws Exception {
          log.info("install MessageControlBrokerPlugin");
          return new MessageControlBroker(broker);
          }
          }

          其次,編寫(xiě)真正的插件實(shí)現(xiàn):

          package com.alibaba.napoli.plugins;

          import java.io.IOException;

          import org.apache.activemq.broker.Broker;
          import org.apache.activemq.broker.BrokerFilter;
          import org.apache.activemq.broker.ConnectionContext;
          import org.apache.activemq.broker.ProducerBrokerExchange;
          import org.apache.activemq.broker.region.Destination;
          import org.apache.activemq.broker.region.MessageReference;
          import org.apache.activemq.broker.region.Queue;
          import org.apache.activemq.command.Message;
          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;

          /**
          * 開(kāi)發(fā)環(huán)境管理插件,符合兩個(gè)條件進(jìn)行消息清理:<br>
          * 1 消息累積超過(guò)1000條
          * 2 消息超過(guò)1個(gè)小時(shí)無(wú)人消費(fèi)
          * @author guolin.zhuanggl
          *
          */
          public class MessageControlBroker extends BrokerFilter {
          public static Log log = LogFactory.getLog(DiscardingDLQBroker.class);
          private static final long DEFAULT_EXPIRATION = 3600*1000;
          private static final long DEFAULT_PURGE_COUNT = 1000;

          public MessageControlBroker(Broker next) {
          super(next);
          }

          @Override
          public void messageExpired(ConnectionContext context,
          MessageReference message) {

          Message msg = null;
          try {
          msg = message.getMessage();
          } catch (IOException e) {
          log.error("failed to fetch content: ",e);
          }
          purgeMessage(msg);
          // TODO Auto-generated method stub
          super.messageExpired(context, message);
          }

          /**
          * 清除隊(duì)列中的所有消息
          */
          private void purgeMessage(Message message){
          Destination r = message.getRegionDestination();
          if(r instanceof Queue){
          try {
          //如果累積消息超過(guò)1000個(gè),清除隊(duì)列消息
          if(((Queue) r).getMessages().size() > DEFAULT_PURGE_COUNT){
          ((Queue) r).purge();
          }
          } catch (Exception e) {
          // TODO Auto-generated catch block
          log.error("failed to purge queue "+r.getName(),e);
          }
          }

          }
          /**
          * 當(dāng)消息發(fā)送時(shí),全部設(shè)置過(guò)期時(shí)間1個(gè)小時(shí),測(cè)試環(huán)境專(zhuān)用!!!
          */
          @Override
          public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception {
          long oldExp = messageSend.getExpiration();
          messageSend.setExpiration(oldExp < DEFAULT_EXPIRATION && oldExp > 0 ? oldExp : DEFAULT_EXPIRATION );
          purgeMessage(messageSend);
          super.send(producerExchange, messageSend);
          }

          }

          然后,將這兩個(gè)類(lèi)打包為myplugin.jar,并放在activemq啟動(dòng)目錄下的lib目錄下

          最后,在activemq.xml文件中增加一個(gè)簡(jiǎn)單的spring配置項(xiàng):

          <bean xmlns=" id="purgePlugin"
          class="com.alibaba.napoli.plugins.MessageControlBrokerPlugin">
          </bean>

          然后,重啟activemq,就會(huì)發(fā)現(xiàn)這個(gè)插件已經(jīng)被加載。


          FeedBack:
          # re: ActiveMQ的插件開(kāi)發(fā)介紹[未登錄](méi)
          2010-07-21 16:04 |
          你這個(gè)例子測(cè)試過(guò)嗎  回復(fù)  更多評(píng)論
            
          # re: ActiveMQ的插件開(kāi)發(fā)介紹[未登錄](méi)
          2010-07-21 16:05 |
          if(r instanceof Queue)
          r 不是這個(gè)實(shí)例,程序有問(wèn)題吧
            回復(fù)  更多評(píng)論
            
          # re: ActiveMQ的插件開(kāi)發(fā)介紹[未登錄](méi)
          2010-07-21 16:18 |
          這個(gè)r 是空值,這個(gè)方法不行,樓主在仔細(xì)看看  回復(fù)  更多評(píng)論
            

          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           

          <2010年7月>
          27282930123
          45678910
          11121314151617
          18192021222324
          25262728293031
          1234567

          常用鏈接

          留言簿

          隨筆分類(lèi)

          隨筆檔案

          搜索

          •  

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          主站蜘蛛池模板: 麟游县| 治县。| 巴马| 固镇县| 松江区| 宽城| 集安市| 淮滨县| 威远县| 巴中市| 汶川县| 北流市| 犍为县| 渑池县| 射阳县| 玛纳斯县| 凤城市| 阿合奇县| 阳江市| 潜山县| 威信县| 偃师市| 昌江| 安溪县| 萨迦县| 海盐县| 囊谦县| 衢州市| 临澧县| 仁怀市| 哈密市| 山东| 包头市| 新密市| 会泽县| 和政县| 津南区| 曲麻莱县| 清苑县| 嘉荫县| 商都县|