JMS消息傳遞是一種異步過程,如果沒有接受者JMS Provider應該選擇將消息持久化,策略主要有文件持久化和基于數據庫的持久化兩種,下文是關于將未消費的消息持久化到MySql數據庫的。
首先,我們要做的是將MySql數據庫的驅動包放置到ActiveMQ的安裝目錄下的lib里。
其次,我們需要改寫activemq.xml文件,它在ActiveMQ的conf目錄中。具體修改部分請見下文中粗體部分:
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>

<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data">
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>

<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysqlDataSource"/>
</persistenceAdapter>
<!--
For better performances use VM cursor and small memory limit.
For more information, see:
http://activemq.apache.org/message-cursors.html
Also, if your producer is "hanging", it's probably due to producer flow control.
For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The systemUsage controls the maximum amount of space the broker will
use before slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
-->
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>

</broker>

<!-- MySql dataSource -->
<bean id="mysqlDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/test?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="hy"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

<!--
Uncomment to enable Camel
Take a look at activemq-camel.xml for more details
<import resource="camel.xml"/>
-->

<!--
Enable web consoles, REST and Ajax APIs and demos
Take a look at activemq-jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>

如上,在persistenceAdapter節點中指定mySql數據源,在broker節點外配置了bean:
<!-- MySql dataSource -->
<bean id="mysqlDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/test?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="hy"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
上面加粗的一句指明了消息存儲在mysql的test數據庫中,當然你可以指定成任何你想要的數據庫,注意在啟動ActiveMq之前這個數據庫是要存在的,否則報錯。另外需要贅述的一點是activemq.xml中不要有中文,看來白狗子對非英語的抵觸真是無處不在,我們所有中國人應該奮發圖強恢復漢唐明榮光,讓異族不得不仰視我們,當然這是后話。
下面就可以啟動ActiveMQ了,向隊列發送幾條消息后,我們可以看看數據庫里發生了什么變化,如下:
上圖是ActiveMQ為了持久化消息而設立的三張表。

上圖是持久化到數據庫的未被消費掉的消息。
如果這些待消費消息被接收后,這張表就會空空如也,如下:
你可以使用
這個程序(注意包要自己導入一下)來測試一下。
以上就是將ActiveMQ中未消費的消息持久化到MySql數據庫的過程,但愿對你有所幫助。