廉頗老矣,尚能飯否

          java:從技術(shù)到管理

          常用鏈接

          統(tǒng)計(jì)

          最新評(píng)論

          Apache ActiveMQ學(xué)習(xí)筆記【mq的方式有兩種:點(diǎn)到點(diǎn)和發(fā)布/訂閱】

          .簡(jiǎn)介ActiveMQ

          ActiveMQ 是最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)

          .下載ActiveMQ

          首先去http://activemq.apache.org/download.html 下載穩(wěn)定版本4.1.0release

          Download Here

          Description ==Download Link ==PGP Signature file of download

          Binary for Windows== apache-activemq-4.1.0-incubator.zip== incubator-activemq-4.1.0.zip.asc

          Binary-for-Unix/Linux/Cygwin==apache-activemq-4.1.0-incubator.tar.gz==incubator-activemq-4.1.0.tar.gz.as 

          解壓后目錄如下:

          +bin (windows下面的bat和unix/linux下面的sh)

          +conf (activeMQ配置目錄,包含最基本的activeMQ配置文件)

          +data (默認(rèn)是空的)

          +docs (index,replease版本里面沒有文檔,-.-b不知道為啥不帶)

          +example (幾個(gè)例子

          +lib (activemMQ使用到的lib)

          -apache-activemq-4.1-incubator.jar (ActiveMQ的binary)

          -LICENSE.txt

          -NOTICE.txt

          -README.txt

          -user-guide.html

          .啟動(dòng)ActiveMQ

          可以使用bin\activemq.bat(activemq) 啟動(dòng),如果一切順利,你就會(huì)看見類似下面的信息(此處解壓到D盤根目錄下).幾個(gè)小提示

          1. 這個(gè)僅僅是最基礎(chǔ)的ActiveMQ的配置,很多地方都沒有配置因此不要直接使用這個(gè)配置用于生產(chǎn)系統(tǒng)

          2. 有的時(shí)候由于端口被占用,導(dǎo)致ActiveMQ錯(cuò)誤,ActiveMQ可能需要以下端口1099(JMX),61616(默認(rèn)的TransportConnector)

          3. 如果沒有物理網(wǎng)卡,或者M(jìn)S的LoopBackAdpater Multicast會(huì)報(bào)一個(gè)錯(cuò)誤

          .監(jiān)控ActiveMQ

          啟動(dòng)JDK自帶的java控制臺(tái)查看程序查看客戶端(如C:\jdk1.6.0_07\bin\jconsole.exe)

          遠(yuǎn)程進(jìn)程:127.0.0.1:1099

          . 測(cè)試ActiveMQ

          由于ActiveMQ是一個(gè)獨(dú)立的jms provider,所以我們不需要其他任何第三方服務(wù)器就可以馬上做我們的測(cè)試了.編譯example目錄下面的程序ProducerTool/ConsumerTool 是JMS參考里面提到的典型應(yīng)用,Producer產(chǎn)生消息,Consumer消費(fèi)消息,而且這個(gè)例子還可以加入?yún)?shù)幫助你測(cè)試剛才啟動(dòng)的本地ActiveMQ或者是遠(yuǎn)程的ActiveMQ

          ProducerTool [url] broker的地址,默認(rèn)的是tcp://localhost:61616

          [true|flase] 是否使用topic,默認(rèn)是false

          [subject] subject的名字,默認(rèn)是TOOL.DEFAULT

          [durabl] 是否持久化消息,默認(rèn)是false

          [messagecount] 發(fā)送消息數(shù)量,默認(rèn)是10

          [messagesize] 消息長(zhǎng)度,默認(rèn)是255

          [clientID] durable為true的時(shí)候,需要配置clientID

          [timeToLive] 消息存活時(shí)間

          [sleepTime] 發(fā)送消息中間的休眠時(shí)間

          [transacte] 是否采用事務(wù)

          ConsumerTool [url] broker的地址,默認(rèn)的是tcp://localhost:61616

          [true|flase] 是否使用topic,默認(rèn)是false

          [subject] subject的名字,默認(rèn)是TOOL.DEFAULT

          [durabl] 是否持久化消息,默認(rèn)是false

          [maxiumMessages] 接受最大消息數(shù)量,0表示不限制

          [clientID] durable為true的時(shí)候,需要配置clientID

          [transacte] 是否采用事務(wù)

          [sleepTime] 接受消息中間的休眠時(shí)間,默認(rèn)是0,onMeesage方法不休眠

          [receiveTimeOut] 接受超時(shí)

          .使用應(yīng)用程序發(fā)送點(diǎn)到點(diǎn)消息隊(duì)列

          TestSender類

          package test;

          import javax.jms.DeliveryMode;

          import javax.jms.JMSException;

          import javax.jms.Message;

          import javax.jms.Session;

          import org.springframework.context.ApplicationContext;

          import org.springframework.context.support.FileSystemXmlApplicationContext;

          import org.springframework.jms.core.JmsTemplate;

          import org.springframework.jms.core.MessageCreator;

          public class TestSender {

              public static void main(String[] args) {

                  ApplicationContext ctx = new FileSystemXmlApplicationContext("conf/applicationContext.xml");

                  JmsTemplate template = (JmsTemplate) ctx.getBean("myJmsTemplate");

                  template.setDeliveryMode(DeliveryMode.PERSISTENT);

                  template.send(new MessageCreator() {

                      public Message createMessage(Session session) throws JMSException {

                          Message message = session.createTextMessage();

                          message.setStringProperty("name", "wangwu");

                          message.setStringProperty("password", "ww");

                          System.out.println("send success");

                          return message;

                      }

                  });

              }

          }

          applicationContext.xml配置文件

          <beans xmlns="http://www.springframework.org/schema/beans"

              xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

              xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

            http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

              <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"

                  destroy-method="stop">

                  <property name="connectionFactory">

                      <bean class="org.apache.activemq.ActiveMQConnectionFactory">

                          <property name="brokerURL" value="tcp://127.0.0.1:61616" />

                      </bean>

                  </property>

              </bean>

              <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">

                  <property name="defaultDestinationName" value="Hello.Queue" />

                  <property name="connectionFactory">

                      <ref local="jmsFactory" />

                  </property>

              </bean>

          </beans>

          . 使用應(yīng)用程序接受點(diǎn)到點(diǎn)消息隊(duì)列

          ExampleListener類

          package test;

          import javax.jms.JMSException;

          import javax.jms.MapMessage;

          import javax.jms.Message;

          import javax.jms.ObjectMessage;

          import javax.jms.Session;

          import javax.jms.StreamMessage;

          import javax.jms.TextMessage;

           

          import org.springframework.jms.listener.SessionAwareMessageListener;

          public class ExampleListener implements SessionAwareMessageListener {

              public void onMessage(Message message, Session session) throws JMSException {

                  if(message instanceof TextMessage) {

                      System.out.println("TextMessage begin");

                      System.out.println("name = " + message.getStringProperty("name"));

                      System.out.println("password = " +message.getStringProperty("password"));

                  }

                  if (message instanceof ObjectMessage) {

                      System.out.println("ObjectMessage");

                  } else if (message instanceof TextMessage) {

                      System.out.println("TextMessage");

                  } else if (message instanceof StreamMessage) {

                      System.out.println("StreamMessage");

                  } else if (message instanceof MapMessage) {

                      System.out.println("MapMessage");

                  }

              }

          }

          TestReceiver類

          package test;

          import javax.jms.JMSException;

          import org.springframework.context.support.FileSystemXmlApplicationContext;

          public class TestReceiver {

              public static void main(String[] args) throws JMSException {

                  new FileSystemXmlApplicationContext("conf/context.xml");

              }

          }

          context.xml配置文件.

          <beans xmlns="http://www.springframework.org/schema/beans"

              xmlns:amq="http://activemq.org/config/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

              xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

            http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

              <bean id="ExampleListener" class="test.ExampleListener" />

              <bean id="destinationa" class="org.apache.activemq.command.ActiveMQQueue">

                  <constructor-arg>

                      <value>Hello.Queue</value>

                  </constructor-arg>

              </bean>

              <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"

                  destroy-method="stop">

                  <property name="connectionFactory">

                      <bean class="org.apache.activemq.ActiveMQConnectionFactory">

                          <property name="brokerURL" value="tcp://127.0.0.1:61616" />

                      </bean>

                  </property>

              </bean>

              <bean id="listenerContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">

                  <property name="concurrentConsumers" value="1" />

                  <property name="connectionFactory" ref="jmsFactory" />

                  <property name="destination" ref="destinationa" />

                  <property name="messageListener" ref="ExampleListener" />

              </bean>

          </beans>

          . 使用WEB程序發(fā)送點(diǎn)到點(diǎn)消息隊(duì)列

          TestWebSender類

          package com.jl.material.adapter.impl;

          import javax.jms.JMSException;

          import com.jl.framework.jms.JmsManager;

          import com.jl.framework.jms.JmsManagerFactory;

          import com.jl.framework.jms.MessageCreateable;

          import com.jl.framework.jms.MessageCreatorDefault;

          import com.jl.framework.jms.util.JmsUtil;

          import com.jl.material.util.MaterialConstants;

          public class TestWebSender{

              public void sender(String name, String password) {

                  MessageCreateable mc = new MessageCreatorDefault();       

                  mc.setStringProperty("name", name);

                  mc.setStringProperty("password", password);       

                  JmsManagerFactory jmsManagerFactory = new JmsManagerFactory();

                  JmsManager jmsTXManager = jmsManagerFactory.createJmsManager(MaterialConstants.MATERIAL_MODULE_NAME);

                  try {

                      jmsTXManager.send(JmsUtil.getDestinationFromConfig("quality_synVerifyBatch_queue_M2Q"), mc);

                      jmsTXManager.commit();

                  } catch (JMSException e) {

                      jmsTXManager.rollback();

                      throw new RuntimeException(e);

                  }

              }

          }

          . 使用WEB程序接受點(diǎn)到點(diǎn)消息隊(duì)列

          TestWebReceiver類

          package com.jl.material.adapter.impl;

          import javax.jms.MapMessage;

          import javax.jms.ObjectMessage;

          import javax.jms.StreamMessage;

          import javax.jms.TextMessage;

          import com.jl.framework.jms.util.support.JMSCallbackable;

          public class TestWebReceiver implements JMSCallbackable {

              public void mdCallback(Object TextMessage_textMessage) {}

              public void logJMSMessageInfo(String arg0) {}

              public void mdCallback(ObjectMessage arg0) throws Exception {}

              public void mdCallback(TextMessage textMessage) throws Exception {

                  String name = textMessage.getStringProperty("name");

                  String password = textMessage.getStringProperty("password");

                  System.out.println("name = " + name);

                  System.out.println("password = " + password);

              }

              public void mdCallback(StreamMessage arg0) throws Exception {}

              public void mdCallback(MapMessage arg0) throws Exception {}

          }

          test_JMS_Spring_Listener.xml配置文件

          <?xml version="1.0" encoding="UTF-8"?>

          <beans>

              <bean id="test_testWebReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">

                 <property name="jmsCallbackable">

                     <bean class="com.jl.material.adapter.impl.TestWebReceiver "/>

                 </property>

              </bean>

              <bean id="test_testWebReceiver _queue" class="org.apache.activemq.command.ActiveMQQueue">

                 <constructor-arg>

                     <value>${quality_assayVerifyBatch_queue_Q2M}</value>

                 </constructor-arg>

              </bean>

           

              <bean id="test_listenerContainerA"

                 class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">

                  <property name="concurrentConsumers" value="1" />

                 <property name="connectionFactory" ref="jmsRecieveFactory" />

                 <property name="destination" ref=" test_testWebReceiver _queue ">

                 <property name="messageListener" ref=" test_testWebReceiver " />

                 <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>

              </bean>   

          </beans>

          jms.properties資源文件

          jms.sendip=tcp://10.1.1.40:61616,tcp://activemq:61616

          jms.listenip=tcp://10.1.1.40:61616,tcp://activemq:61616

          quality_assayVerifyBatch_queue_Q2M=

          Quality_Assay_VerifyBatch_Q2M.Queue

          context.xml配置文件

          <?xml version='1.0' encoding='utf-8'?>

          <Context path="/identity" reloadable="false">

              <Environment name="jms/jms.sendip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />

              <Environment name="jms/jms.listenip" value="tcp://10.1.1.40:61616,tcp://activemq:61616" type="java.lang.String" />

          </Context>

          十. 使用發(fā)布/訂閱方式
          使用該方式的發(fā)布方基本與點(diǎn)到點(diǎn)方式一樣,區(qū)別只在隊(duì)列名的后綴從 .Queue 變成了 .Topic
          區(qū)別主要在接收方配置文件
          <bean id="pound_removeBindingInfoReceiver" class="com.jl.framework.jms.util.support.JMSRecieveBean">
              <property name="jmsCallbackable">
                  <bean class="com.jl.pound.adapter.impl.RemoveBindingInfoReceiver">
                   <property name="initialInfomationService" ref="initialInfomationService" />
                   <property name="productPoundServiceFacade" ref="productPoundServiceFacade" />
                  </bean>
              </property>
           </bean>

           <bean id="pound_removeBindingInfomation_topic" class="org.apache.activemq.command.ActiveMQTopic">
             <constructor-arg><value>${CraftPound_RemoveBindingInfomation}</value></constructor-arg>
           </bean>

           <bean id="pound_listenerContainerB"
               class="com.jl.framework.jms.util.listener.JLMessageListenerContainer">
                <property name="concurrentConsumers" value="1"/>
                <property name="connectionFactory" ref="jmsRecieveFactory" />
                <property name="destination" ref="pound_removeBindingInfomation_topic" />
                <property name="messageListener" ref="pound_removeBindingInfoReceiver" />
                <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"></property>
                <property name="subscriptionDurable" value="true"></property>
                <property name="pubSubDomain" value="true"></property>
                <!-- <property name="clientId" value="100554"></property> -->
                <property name="clientId" ref="generateClientIdentityCode1"></property>
           </bean>

           <bean id="generateClientIdentityCode1"
             class="com.jl.pound.security.GenerateClientIdentityCode">
             <property name="prefix" value="1"></property>
           </bean>

          import org.apache.log4j.Logger;
          import org.springframework.beans.factory.FactoryBean;

          import com.**.util.NetworkUtils;

          public class GenerateClientIdentityCode implements FactoryBean {

              /**
               * LOGGER for sample classify delegate
               */
              private static final Logger LOGGER = Logger.getLogger(GenerateClientIdentityCode.class);
             
              /**
               * prefix
               */
              private String prefix;

           /**
            * @return the prefix
            */
           public String getPrefix() {
            return prefix;
           }

           /**
            * @param prefix the prefix to set
            */
           public void setPrefix(String prefix) {
            this.prefix = prefix;
           }
           
              public String generateIdentityCode() {
                  String mac = NetworkUtils.getMACAddress();
                  mac = prefix + mac;
                  LOGGER.info("this client identity code is:" + mac);
                  return mac;
              }

           public Object getObject() throws Exception {
            return generateIdentityCode();
           }

           public Class getObjectType() {
            return String.class;
           }

           public boolean isSingleton() {
            return false;
           }
          }

          在JMS中,Topic實(shí)現(xiàn)publish和subscribe語義。一條消息被publish時(shí),它將發(fā)到所有感興趣的訂閱者,所以零到多個(gè)subscriber將接收到消息的一個(gè)拷貝。但是在消息代理接收到消息時(shí),只有激活訂閱的subscriber能夠獲得消息的一個(gè)拷貝。

          JMS Queue執(zhí)行l(wèi)oad balancer語義。一條消息僅能被一個(gè)consumer收到。如果在message發(fā)送的時(shí)候沒有可用的consumer,那么它將被保存一直到能處理該message的consumer可用。如果一個(gè)consumer收到一條message后卻不響應(yīng)它,那么這條消息將被轉(zhuǎn)到另一個(gè)consumer那兒。一個(gè)Queue可以有很多consumer,并且在多個(gè)可用的consumer中負(fù)載均衡

          可以使用queue方式發(fā)送注冊(cè)郵件 好友動(dòng)態(tài)數(shù)據(jù)等

          <一>表說明:
          當(dāng)在啟動(dòng)ActiveMQ時(shí),先判斷表是否存在,如果不存在,將去創(chuàng)建表,如下:
          (1)ACTIVEMQ_ACKS:持久訂閱者列表
          1.CONTAINER:類型://主題
          如:topic://basicInfo.topic
          2.SUB_DEST:應(yīng)該是描述,與1內(nèi)容相同
          3.CLIENT_ID:持久訂閱者的標(biāo)志ID,必須唯一
          4.SUB_NAME:持久訂閱者的名稱.(durableSubscriptionName)
          5.SELECTOR:消息選擇器,consumer可以選擇自己想要的
          6.LAST_ACKED_ID:最后一次確認(rèn)ID,這個(gè)字段存的該該訂閱者最后一次收到的消息的ID

          (2)ACTIVEMQ_LOCK:進(jìn)行數(shù)據(jù)訪問的排斥鎖
          1.ID:值為1
          2.TIME:時(shí)間
          3.BROKER_NAME:broker的名稱
             這個(gè)表似為集群使用,但現(xiàn)在ActiveMQ并不能共享數(shù)據(jù)庫.

          (3)ACTIVEMQ_MSGS:存儲(chǔ)Queue和Topic消息的表
          1.ID:消息的ID
          2.CONTAINER: 類型://主題
          如:queue://my.queue
          Topic://basicInfo.topic
          3.MSGID_PROD:發(fā)送消息者的標(biāo)志
          MSGID_PROD =ID:[computerName][…..]
          注意computerName,不要使用中文,消息對(duì)象中會(huì)存儲(chǔ)這個(gè)部分,解析connectID時(shí)會(huì)出現(xiàn)Bad String錯(cuò)誤.
          4.MSGID_SEQ:還不知用處
          5.EXPIRATION:到期時(shí)間.
          6.MSG:消息本身,Blob類型.
          可以在JmsTemplate發(fā)送配置中,加上<property name=”timeToLive” value=”432000000”/>,5天的生命期,如果消息一直沒有被處理,消息會(huì)被刪除,但是表中會(huì)存在CONTAINER為queue://ActiveMQ.DLQ的記錄.也就是說,相當(dāng)于將過期的消息發(fā)給了一個(gè)ActiveMQ自定義的刪除隊(duì)列..

          <二>關(guān)于ActiveMQ的持久訂閱消息刪除操作
          1.主題消息只有一條,所有訂閱了這個(gè)消息的持久訂閱者都要收到消息,只有所有訂閱者收到消息并確認(rèn)(Acknowledge)之后.才會(huì)刪除.
          說明:ActiveMQ支持批量(optimizeAcknowledge為true)確認(rèn),以提高性能
          2.ActiveMQ執(zhí)行刪除Topic消息的cleanup()操作的時(shí)間間隔為5 minutes..



          柳德才
          13691193654
          18942949207
          QQ:422157370
          liudecai_zan@126.com
          湖北-武漢-江夏-廟山

          posted on 2009-04-08 11:18 liudecai_zan@126.com 閱讀(18468) 評(píng)論(3)  編輯  收藏 所屬分類: 程序人生

          評(píng)論

          # re: Apache ActiveMQ學(xué)習(xí)筆記【mq的方式有兩種:點(diǎn)到點(diǎn)和發(fā)布/訂閱】 2011-07-01 11:16 easy518網(wǎng)址導(dǎo)航

          http://www.easy518.com  回復(fù)  更多評(píng)論   

          # re: Apache ActiveMQ學(xué)習(xí)筆記【mq的方式有兩種:點(diǎn)到點(diǎn)和發(fā)布/訂閱】 2012-08-17 11:40 geek

          好文章  回復(fù)  更多評(píng)論   

          # re: Apache ActiveMQ學(xué)習(xí)筆記【mq的方式有兩種:點(diǎn)到點(diǎn)和發(fā)布/訂閱】 2016-08-03 13:32 zcf

          大神 ActiveMQ支不支持大消息拆分呢,求解釋  回復(fù)  更多評(píng)論   

          主站蜘蛛池模板: 出国| 晋宁县| 顺平县| 阿拉善左旗| 玉屏| 三亚市| 沙坪坝区| 墨竹工卡县| 诏安县| 高密市| 乌拉特前旗| 太和县| 巨野县| 石台县| 化隆| 亚东县| 衡阳县| 余姚市| 阜平县| 凯里市| 海城市| 南川市| 曲周县| 宁阳县| 阿图什市| 社旗县| 镇原县| 淮阳县| 博乐市| 安福县| 长顺县| 比如县| 赞皇县| 抚松县| 新乡县| 沙坪坝区| 临桂县| 卢氏县| 五常市| 抚松县| 普安县|