??xml version="1.0" encoding="utf-8" standalone="yes"?>
其中Q红色部分是pȝQ也是启动javaE序加蝲Main函数的classloaderQ?主要的设计考量有以下几点:(x) 1、用自定义的ExtClassLoaderQ加载j(lu)ava的ext目录下的jar包)(j)把程序加载的class完全和系l加载的class隔离开Q这样即使在eclipse容器中启动都不会(x)有类冲突?/p>
Z么不从系l的ExtClassLoader作ؓ(f)自定义classloader数的Ҏ(gu)两个考虑Q第一个是pȝExtClassLoader有可能不存在Q第二个是如果使用同一个ExtClassLoader中,在处理JNDI、XML和URL解析{java扩展功能时会(x)遇到后加载的handler部分D不同classloader?wi)加载的同一个类的ClassCastExceptionQ具体参见这些模块的源代码?/p>
2、WarClassLoader除了(jin)pȝcdCommonc(目前只有log相关c)(j)以外的类都从war包的WEB-INFO和classes下加载?/p>
3、所有执行W(xu)ar包中代码的线EThreadContextClassLoader都设|ؓ(f)WarClassLoaderQ以供Spring和W(xu)ebx中的相关工具cM用这个classloaderl构的后门来加蝲war包中的类Q典型例子是Webx中ResourceLoaderService是使用ContextClassLoader来加载类的?/p>
4、RialtoClassLoader也就是这个项目的容器加蝲器和W(xu)arClassLoader不在同一个树(wi)路径上,可以避免E序使用cdwar使用cȝclass冲突Q典型的是Spring容器相关代码?/p>
5、CommonClassLoader加蝲的类需要严格控Ӟ否则可能?x)导致运行期cdH,例如Spring的相关jar包绝对不可以出现在这个classloader作用范围内?/p>
MQClassLoader采用父分z机Ӟ后来增加的Thread ContextClassLoader在这个体pM增加?jin)一个后门,带来?jin)灵zL,也带来了(jin)很多令h困扰的问题,在做容器cȝ目旉免会(x)遇到class loader层次设计的问题,q里抛砖引玉Q欢q达人拍砖?/p>
在ActiveMQ中,Broker代表一个运行的MQ节点QActiveMQ的插件实际上是基于Broker的一个Filter链,整个设计cM于servlet的Filterl构Q所有的Plugin构成一个链式结构,每个插g实际上都是一?Interceptor"Q类l构囑֦下:(x)
其中Broker接口装?jin)一个AMQ节点的方斚w面的Ҏ(gu)Q包括连接管理、session理、消息的发送和接收以及(qing)其它的一些功能,BrokerFilter实现q个接口Qƈ提供?jin)链式结构支持,可以拦截所有BrokerҎ(gu)的实现ƈ传递结果给铑ּl构的下一个,形成?jin)一个完整的"职责?模式Q具体层ơ关pd下,其中Q?System Plugin"是指AMQ内部使用Plugin机制实现的一些系l功能,用户不能定制Q?AMQ Plugin"指的是ActiveMQ已经实现好了(jin)Q可以在配置文g中自由选择的一些插Ӟ例如单的安全插gQJAAS安全插g和DLQ插g{等Q用h件就是指用户自己实现的amq插gQ需要用h相关jar包放入到amq的启动classpath中,q在配置文g中进行配|才能正加载的插g?/p>
在上面这个层ơ结构中Q最下面的RegionBroker是核?j)组Ӟ在其之上的都是Broker的插Ӟl承之于BrokerFilterQ和Broker保持接口兼容但是扩展Broker的功能?/p>
下面举一个简单的例子Q具体说明一下AMQ的插件是如何工作的?/p>
我们在用AMQ的过E中发现Q在试环境l护斚w有很大的ȝ(ch)Q具体表现在很多同学在测试项目的时候往往只关注自己项目牵涉的队列Q不?x)去消费其?不相?的队列,q样D的一个问题就是ActiveMQl常发生大量数据dQ导致测试环境不可用Q媄(jing)响相关项目的试工作。ؓ(f)?jin)避免这个问题,我们假定在测试环境可以定义以下一些限制条Ӟ(x) 1?所有队列堆U消息不过1000条,过之后立即清除?/p>
2?消息过1个小时没有消费,q接过期?/p>
我们可以~写一个简单的amq插g来完成这两个限制条gQ?/p>
首先Q编写一个插件安装类Q?/p>
package com.alibaba.napoli.plugins; import org.apache.activemq.broker.Broker;
public class MessageControlBrokerPlugin implements BrokerPlugin {
public Broker installPlugin(Broker broker) throws Exception {
其次Q编写真正的插g实现Q?/p>
package com.alibaba.napoli.plugins; import java.io.IOException; import org.apache.activemq.broker.Broker;
/**
public MessageControlBroker(Broker next) {
@Override
Message msg = null;
/**
}
} 然后Q将q两个类打包为myplugin.jarQƈ攑֜activemq启动目录下的lib目录?/p>
最后,在activemq.xml文g中增加一个简单的spring配置:(x) <bean xmlns="
id="purgePlugin"
然后Q重启activemqQ就?x)发现这个插件已l被加蝲?/p>
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
private static Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);
log.info("install MessageControlBrokerPlugin");
return new MessageControlBroker(broker);
}
}
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
* 开发环境管理插ӞW合两个条gq行消息清理Q?lt;br>
* 1 消息累积过1000?
* 2 消息过1个小时无人消?
* @author guolin.zhuanggl
*
*/
public class MessageControlBroker extends BrokerFilter {
public static Log log = LogFactory.getLog(DiscardingDLQBroker.class);
private static final long DEFAULT_EXPIRATION = 3600*1000;
private static final long DEFAULT_PURGE_COUNT = 1000;
super(next);
}
public void messageExpired(ConnectionContext context,
MessageReference message) {
try {
msg = message.getMessage();
} catch (IOException e) {
log.error("failed to fetch content: ",e);
}
purgeMessage(msg);
// TODO Auto-generated method stub
super.messageExpired(context, message);
}
* 清除队列中的所有消?
*/
private void purgeMessage(Message message){
Destination r = message.getRegionDestination();
if(r instanceof Queue){
try {
//如果累积消息过1000个,清除队列消息
if(((Queue) r).getMessages().size() > DEFAULT_PURGE_COUNT){
((Queue) r).purge();
}
} catch (Exception e) {
// TODO Auto-generated catch block
log.error("failed to purge queue "+r.getName(),e);
}
}
/**
* 当消息发送时Q全部设|过期时?个小Ӟ试环境专用Q!Q?
*/
@Override
public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception {
long oldExp = messageSend.getExpiration();
messageSend.setExpiration(oldExp < DEFAULT_EXPIRATION && oldExp > 0 ? oldExp : DEFAULT_EXPIRATION );
purgeMessage(messageSend);
super.send(producerExchange, messageSend);
}
class="com.alibaba.napoli.plugins.MessageControlBrokerPlugin">
</bean>
是一个实时监控工P使用?
java agent
?
jvm attach
技术,可以在不停机的情况下实时监控U上E序的运行情况,另外Q对
btrace
脚本Q实际上是
java
E序Q做?jin)非怸格的安全限制Q安全性很高,对应用程序基本没有媄(jing)响。在性能斚wQ?
cobar
q行q测试,Ҏ(gu)法进行调用耗时l计的时候,基本消费在微U别,可以说微不道?
【背景?/span>
在中文站
napoli
上线q程后,发现?jin)一个奇怪的现象Q尽?已知"?
offer
发送端都已l迁Ud
napoli
pȝ中,但是老的
mq
pȝ仍然有新?
offer
消息q来Q因?
mq
的服务器非常多,定位消息来源成了(jin)一个非常大的问题。这U情况,惛_?jin)?
BTrace
在某一台服务器q行U上监控q而期望发现这个幽c(din)?
【过E?/span>
首先Q我们需要知道两个基本信息:(x)消息cd和来?
ip
Q这h可以定位
offer
消息的来源?
要知道来?
ip
Q需要找到服务器?
理的类Q只有在建立
socket
的地方,才可以抓到具?
ip
Q经q分?
amq
代码Q发?
tcp
q接基本是由下面q个cL服务所有消息的接收的:(x)
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { private static final Log LOG = LogFactory.getLog(TcpTransport.class); private static final ThreadPoolExecutor SOCKET_CLOSE; protected final URI remoteLocation; protected final URI localLocation; protected final WireFormat wireFormat; protected int connectionTimeout = 30000; protected int soTimeout; protected int socketBufferSize = 64 * 1024; protected int ioBufferSize = 8 * 1024; protected boolean closeAsync=true;
protected Socket socket;
|
q个cM包含一?
socket
对象的成员变量,所有我们只要监?
readCommand
Ҏ(gu)Q这个方法的q回值实际上是一?
ActivemqObjectMessage
对象Q这样就可以在一个方法上加拦截器可以同时捕获到
ip
和消息对象,两全其美Q!Q?
protected Object readCommand() throws IOException { return wireFormat.unmarshal(dataIn); } |
因ؓ(f)原有
ESB
消息通道都是一个队?
ESBQueue
Q所以无法通过队列名称来确定消息类型,必须通过
ESBTransferObject
对象来取得消息类型:(x)
destType
Q?
offer
的区间是
1000-1008
public class ESBTransferObject implements Serializable {
private static final long serialVersionUID = -5975115234845303878L; /** * 消息体,原则上对象序列化后的XML数据(String) 注意使用XML1.1规范?/p> */ private Object content; /** * 用户自定义数?/p> */ private Object userDefineData; /** * 目的消息cd */
private int destType = -1;
|
但是Q在服务器端q没?
ESBTransferObject
对象Q无法反序列化(
BTrace
也不支持反序列化操作Q,所以没有方法简单取得消息类型信息!Q!
OK
Q我不反序列化,直接拿二q制
byte[]
Q类型信息应该是在固定位|的吧?但是发现q个对象
content
变长字符串定义在cd之前Q类型位|不定?jin),晕倒啊
不死?j),输出二进制数据,x花明啊,原来对象序列化的时候,
primitive
?
field
都是紧接着cd信息写入的,所以,cd信息是在固定位置?
Q类型信息始l是
255
Q?
256
两个字节Q实际上?
4
个字节,但是目前我们只占?
2
个)(j)
Ok
Q编写代码,试环境q行一下,晕倒,竟然有数l溢出!
使用
BTrace
Q把q个数组打印下来Q这个需要点技巧,
btrace
q?
for
都不允许Q,竟然发现
位置偏移?/span>
205
Q?
206
位置
Q这个真的不知道什么原因,估计是客L(fng)发送的时候压~了(jin)Q简单修改偏U量Q测试运行,
ok
Q所有的消息cd?
ip
的对照表打印出来?jin)?
package com.alibaba.btrace.script;
import static com.sun.btrace.BTraceUtils.*;
import com.sun.btrace.annotations.*;
@BTrace
public class AMQQueue2IP {
@OnMethod(clazz = "org.apache.activemq.transport.tcp.TcpTransport", //需要拦截的cd
method = "readCommand", //需要拦截的Ҏ(gu)?/p>
location = @Location(Kind.RETURN)) //拦截位置Q方法返回时
public static void onTransportCommandExit(@Self Object transport, @Return Object command) { //捕获调用对象和返回?/p>
String commandName = str(command);
boolean isObjectMessage = (indexOf(commandName, "org.apache.activemq.command.ActiveMQObjectMessage") >= 0);
if (isObjectMessage) {
Object msg = command;
Object content = get(field(getSuperclass(getSuperclass(classOf(msg))), "content", false), msg);//捕获消息内容byte[]
byte[] bs = (byte[]) get(field(classOf(content), "data", false), content);
if (bs.length >= 206) {
int off = getInt(field(classOf(content), "offset", false), content);
int code = (0xff00&bs[205]<<8)+(0xff&bs[206]); //转换205,206字节为消息类?/p>
//println(str(code));
Object socket = get(field(classOf(transport), "socket"), transport);
String address = str(socket); //截取ip地址
int s = indexOf(address, "/");
int e = indexOf(address, ",");
int len = e - s;
String ip = substr(address, s + 1, e);
print(strcat(timestamp(),"---"));
println(strcat(strcat("ip: ", ip), strcat(" queueName: ", str(code))));
}
}
}
}
打印l果Q?/span>
2/3/10 12:38 PM---ip: 172.22.2.34 queueName: 2001
2/3/10 12:38 PM---ip: 172.22.2.41 queueName: 5001
2/3/10 12:38 PM---ip: 172.22.2.22 queueName: 5001
2/3/10 12:38 PM---ip: 172.22.2.47 queueName: 2001
2/3/10 12:38 PM---ip: 172.22.2.31 queueName: 2001
2/3/10 12:38 PM---ip: 172.22.2.13 queueName: 5001
2/3/10 12:38 PM---ip: 172.22.2.6 queueName: 5001
2/3/10 12:38 PM---ip: 172.22.2.48 queueName: 2001
2/3/10 12:38 PM---ip: 172.22.2.39 queueName: 2001
【补充?/span>
BTrace 是一个强大的工具Q但是,在线上检的时候考虑时效性和安全性,必须有一个经q检验的脚本库才可以安全?qing)时的定位系l问?
但是Qؓ(f)什么文本文件会(x)被当成是二进制文Ӟ和今天的DNS切换有什么关p?分析后发玎ͼ因ؓ(f)dns的问题,抓数据的脚本执行旉明显变长Q这P在文件还在写入的时候,监控脚本开始读取数据文Ӟ在这L(fng)q发讉K下,grep?x)认己正在访问一个binary文gQ导致监控误报警?/p>
调试代码发现Q?/p>
在Activemq的send response处理中,使用?jin)一个BlockingQueueQ在有timeout的方法里Q用了(jin)pollҎ(gu)Q这个方法的api说明中指出,当timeout发生Ӟq个Ҏ(gu)q回nullQ!Q?/p>
我们在看AMQl过层层调用后,在ActiveMQConnectionҎ(gu)中如何处理这个返回|(x)
对返回gؓ(f)I的情况没有做Q何处理,即消息发送超Ӟamq也认个消息发送成功!估计q哥们理解poll在timeout的时候会(x)抛出异常吧?/p>
解决办法很简单,在response为空的时候,抛出JMSExceptionQ告知发生Timeout错误?/p>