ActiveMQ是一個流行的開源MQ,我們也大規(guī)模應(yīng)用在網(wǎng)站的方方面面,每天處理上億消息,取得了較好效果。ActiveMQ有一個很好很強大的插件體系,提供了很強的擴(kuò)展能力,ActiveMQ本身就是使用這一套插件體系實現(xiàn)了很多擴(kuò)展功能,包括他的權(quán)限管理,日志管理,事務(wù)等模塊都是作為一個插件集成的,我們自己也在消息路由、補償式事務(wù)方面使用了它的插件功能,確實非常方便。
在ActiveMQ中,Broker代表一個運行的MQ節(jié)點,ActiveMQ的插件實際上是基于Broker的一個Filter鏈,整個設(shè)計類似于servlet的Filter結(jié)構(gòu),所有的Plugin構(gòu)成一個鏈?zhǔn)浇Y(jié)構(gòu),每個插件實際上都是一個"Interceptor",類結(jié)構(gòu)圖如下:
其中Broker接口封裝了一個AMQ節(jié)點的方方面面的方法,包括連接管理、session管理、消息的發(fā)送和接收以及其它的一些功能,BrokerFilter實現(xiàn)這個接口,并提供了鏈?zhǔn)浇Y(jié)構(gòu)支持,可以攔截所有Broker方法的實現(xiàn)并傳遞結(jié)果給鏈?zhǔn)浇Y(jié)構(gòu)的下一個,形成了一個完整的"職責(zé)鏈"模式,具體層次關(guān)系如下,其中,"System Plugin"是指AMQ內(nèi)部使用Plugin機(jī)制實現(xiàn)的一些系統(tǒng)功能,用戶不能定制,"AMQ Plugin"指的是ActiveMQ已經(jīng)實現(xiàn)好了,可以在配置文件中自由選擇的一些插件,例如簡單的安全插件,JAAS安全插件和DLQ插件等等,用戶插件就是指用戶自己實現(xiàn)的amq插件,需要用戶把相關(guān)jar包放入到amq的啟動classpath中,并在配置文件中進(jìn)行配置才能正確加載的插件。
在上面這個層次結(jié)構(gòu)中,最下面的RegionBroker是核心組件,在其之上的都是Broker的插件,繼承之于BrokerFilter,和Broker保持接口兼容但是擴(kuò)展Broker的功能。
下面舉一個簡單的例子,具體說明一下AMQ的插件是如何工作的。
我們在使用AMQ的過程中發(fā)現(xiàn),在測試環(huán)境維護(hù)方面有很大的麻煩,具體表現(xiàn)在很多同學(xué)在測試項目的時候往往只關(guān)注自己項目牽涉的隊列,不會去消費其他"不相關(guān)"的隊列,這樣導(dǎo)致的一個問題就是ActiveMQ經(jīng)常發(fā)生大量數(shù)據(jù)阻塞,導(dǎo)致測試環(huán)境不可用,影響相關(guān)項目的測試工作。為了避免這個問題,我們假定在測試環(huán)境可以定義以下一些限制條件:
1、 所有隊列堆積消息不超過1000條,超過之后立即清除。
2、 消息超過1個小時沒有消費,就直接過期。
我們可以編寫一個簡單的amq插件來完成這兩個限制條件:
首先,編寫一個插件安裝類:
package com.alibaba.napoli.plugins;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageControlBrokerPlugin implements BrokerPlugin {
private static Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);
public Broker installPlugin(Broker broker) throws Exception {
log.info("install MessageControlBrokerPlugin");
return new MessageControlBroker(broker);
}
}
其次,編寫真正的插件實現(xiàn):
package com.alibaba.napoli.plugins;
import java.io.IOException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* 開發(fā)環(huán)境管理插件,符合兩個條件進(jìn)行消息清理:<br>
* 1 消息累積超過1000條
* 2 消息超過1個小時無人消費
* @author guolin.zhuanggl
*
*/
public class MessageControlBroker extends BrokerFilter {
public static Log log = LogFactory.getLog(DiscardingDLQBroker.class);
private static final long DEFAULT_EXPIRATION = 3600*1000;
private static final long DEFAULT_PURGE_COUNT = 1000;
public MessageControlBroker(Broker next) {
super(next);
}
@Override
public void messageExpired(ConnectionContext context,
MessageReference message) {
Message msg = null;
try {
msg = message.getMessage();
} catch (IOException e) {
log.error("failed to fetch content: ",e);
}
purgeMessage(msg);
// TODO Auto-generated method stub
super.messageExpired(context, message);
}
/**
* 清除隊列中的所有消息
*/
private void purgeMessage(Message message){
Destination r = message.getRegionDestination();
if(r instanceof Queue){
try {
//如果累積消息超過1000個,清除隊列消息
if(((Queue) r).getMessages().size() > DEFAULT_PURGE_COUNT){
((Queue) r).purge();
}
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("failed to purge queue "+r.getName(),e);
}
}
}
/**
* 當(dāng)消息發(fā)送時,全部設(shè)置過期時間1個小時,測試環(huán)境專用!!!
*/
@Override
public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception {
long oldExp = messageSend.getExpiration();
messageSend.setExpiration(oldExp < DEFAULT_EXPIRATION && oldExp > 0 ? oldExp : DEFAULT_EXPIRATION );
purgeMessage(messageSend);
super.send(producerExchange, messageSend);
}
}
然后,將這兩個類打包為myplugin.jar,并放在activemq啟動目錄下的lib目錄下
最后,在activemq.xml文件中增加一個簡單的spring配置項:
<bean xmlns="
id="purgePlugin"
class="com.alibaba.napoli.plugins.MessageControlBrokerPlugin">
</bean>
然后,重啟activemq,就會發(fā)現(xiàn)這個插件已經(jīng)被加載。