ActiveMQ是一個(gè)流行的開(kāi)源MQ,我們也大規(guī)模應(yīng)用在網(wǎng)站的方方面面,每天處理上億消息,取得了較好效果。ActiveMQ有一個(gè)很好很強(qiáng)大的插件體系,提供了很強(qiáng)的擴(kuò)展能力,ActiveMQ本身就是使用這一套插件體系實(shí)現(xiàn)了很多擴(kuò)展功能,包括他的權(quán)限管理,日志管理,事務(wù)等模塊都是作為一個(gè)插件集成的,我們自己也在消息路由、補(bǔ)償式事務(wù)方面使用了它的插件功能,確實(shí)非常方便。
在ActiveMQ中,Broker代表一個(gè)運(yùn)行的MQ節(jié)點(diǎn),ActiveMQ的插件實(shí)際上是基于Broker的一個(gè)Filter鏈,整個(gè)設(shè)計(jì)類(lèi)似于servlet的Filter結(jié)構(gòu),所有的Plugin構(gòu)成一個(gè)鏈?zhǔn)浇Y(jié)構(gòu),每個(gè)插件實(shí)際上都是一個(gè)"Interceptor",類(lèi)結(jié)構(gòu)圖如下:
其中Broker接口封裝了一個(gè)AMQ節(jié)點(diǎn)的方方面面的方法,包括連接管理、session管理、消息的發(fā)送和接收以及其它的一些功能,BrokerFilter實(shí)現(xiàn)這個(gè)接口,并提供了鏈?zhǔn)浇Y(jié)構(gòu)支持,可以攔截所有Broker方法的實(shí)現(xiàn)并傳遞結(jié)果給鏈?zhǔn)浇Y(jié)構(gòu)的下一個(gè),形成了一個(gè)完整的"職責(zé)鏈"模式,具體層次關(guān)系如下,其中,"System Plugin"是指AMQ內(nèi)部使用Plugin機(jī)制實(shí)現(xiàn)的一些系統(tǒng)功能,用戶不能定制,"AMQ Plugin"指的是ActiveMQ已經(jīng)實(shí)現(xiàn)好了,可以在配置文件中自由選擇的一些插件,例如簡(jiǎn)單的安全插件,JAAS安全插件和DLQ插件等等,用戶插件就是指用戶自己實(shí)現(xiàn)的amq插件,需要用戶把相關(guān)jar包放入到amq的啟動(dòng)classpath中,并在配置文件中進(jìn)行配置才能正確加載的插件。
在上面這個(gè)層次結(jié)構(gòu)中,最下面的RegionBroker是核心組件,在其之上的都是Broker的插件,繼承之于BrokerFilter,和Broker保持接口兼容但是擴(kuò)展Broker的功能。
下面舉一個(gè)簡(jiǎn)單的例子,具體說(shuō)明一下AMQ的插件是如何工作的。
我們?cè)谑褂肁MQ的過(guò)程中發(fā)現(xiàn),在測(cè)試環(huán)境維護(hù)方面有很大的麻煩,具體表現(xiàn)在很多同學(xué)在測(cè)試項(xiàng)目的時(shí)候往往只關(guān)注自己項(xiàng)目牽涉的隊(duì)列,不會(huì)去消費(fèi)其他"不相關(guān)"的隊(duì)列,這樣導(dǎo)致的一個(gè)問(wèn)題就是ActiveMQ經(jīng)常發(fā)生大量數(shù)據(jù)阻塞,導(dǎo)致測(cè)試環(huán)境不可用,影響相關(guān)項(xiàng)目的測(cè)試工作。為了避免這個(gè)問(wèn)題,我們假定在測(cè)試環(huán)境可以定義以下一些限制條件:
1、 所有隊(duì)列堆積消息不超過(guò)1000條,超過(guò)之后立即清除。
2、 消息超過(guò)1個(gè)小時(shí)沒(méi)有消費(fèi),就直接過(guò)期。
我們可以編寫(xiě)一個(gè)簡(jiǎn)單的amq插件來(lái)完成這兩個(gè)限制條件:
首先,編寫(xiě)一個(gè)插件安裝類(lèi):
package com.alibaba.napoli.plugins;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageControlBrokerPlugin implements BrokerPlugin {
private static Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);
public Broker installPlugin(Broker broker) throws Exception {
log.info("install MessageControlBrokerPlugin");
return new MessageControlBroker(broker);
}
}
其次,編寫(xiě)真正的插件實(shí)現(xiàn):
package com.alibaba.napoli.plugins;
import java.io.IOException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* 開(kāi)發(fā)環(huán)境管理插件,符合兩個(gè)條件進(jìn)行消息清理:<br>
* 1 消息累積超過(guò)1000條
* 2 消息超過(guò)1個(gè)小時(shí)無(wú)人消費(fèi)
* @author guolin.zhuanggl
*
*/
public class MessageControlBroker extends BrokerFilter {
public static Log log = LogFactory.getLog(DiscardingDLQBroker.class);
private static final long DEFAULT_EXPIRATION = 3600*1000;
private static final long DEFAULT_PURGE_COUNT = 1000;
public MessageControlBroker(Broker next) {
super(next);
}
@Override
public void messageExpired(ConnectionContext context,
MessageReference message) {
Message msg = null;
try {
msg = message.getMessage();
} catch (IOException e) {
log.error("failed to fetch content: ",e);
}
purgeMessage(msg);
// TODO Auto-generated method stub
super.messageExpired(context, message);
}
/**
* 清除隊(duì)列中的所有消息
*/
private void purgeMessage(Message message){
Destination r = message.getRegionDestination();
if(r instanceof Queue){
try {
//如果累積消息超過(guò)1000個(gè),清除隊(duì)列消息
if(((Queue) r).getMessages().size() > DEFAULT_PURGE_COUNT){
((Queue) r).purge();
}
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("failed to purge queue "+r.getName(),e);
}
}
}
/**
* 當(dāng)消息發(fā)送時(shí),全部設(shè)置過(guò)期時(shí)間1個(gè)小時(shí),測(cè)試環(huán)境專(zhuān)用!!!
*/
@Override
public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception {
long oldExp = messageSend.getExpiration();
messageSend.setExpiration(oldExp < DEFAULT_EXPIRATION && oldExp > 0 ? oldExp : DEFAULT_EXPIRATION );
purgeMessage(messageSend);
super.send(producerExchange, messageSend);
}
}
然后,將這兩個(gè)類(lèi)打包為myplugin.jar,并放在activemq啟動(dòng)目錄下的lib目錄下
最后,在activemq.xml文件中增加一個(gè)簡(jiǎn)單的spring配置項(xiàng):
<bean xmlns="
id="purgePlugin"
class="com.alibaba.napoli.plugins.MessageControlBrokerPlugin">
</bean>
然后,重啟activemq,就會(huì)發(fā)現(xiàn)這個(gè)插件已經(jīng)被加載。