隨筆-109  評(píng)論-187  文章-25  trackbacks-0

          由于一些原因,最近要復(fù)習(xí)一些東西,所以就把JMS的東西再復(fù)習(xí)一遍,以下便是例子

          jms 中最重要的幾個(gè)概念destination,ACKNOWLEDGE,subscribe,durable subscribe

          destination:topic queque
            queue簡單點(diǎn)說就是1:1 一個(gè)消息只能由一個(gè)consumer去消費(fèi),別的consumer來消費(fèi)的時(shí)候已經(jīng)沒了,先到先得
          topic簡單點(diǎn)說就是1:N 一個(gè)消息可以由多個(gè)consumer來消費(fèi),誰來消費(fèi)都有
           subscribe,拿topic來說如果當(dāng)前訂閱不是持久訂閱,只有再訂閱后生產(chǎn)者生產(chǎn)得消息才能被consumer得到,持久訂閱只要沒有被consumer消費(fèi),早晚會(huì)消費(fèi)這個(gè)消息
           
           
           
           一下是幾個(gè)例子
           
           queuesend:queque消息產(chǎn)生
           queuereceive:queque消息得消費(fèi)
           topicsend :topic消息得產(chǎn)生
           topicreceive1:topic消息的非訂閱
           topicrecieve2:topic消息的持久訂閱
           
           這個(gè)例子實(shí)在WEBLOGIC814上測試過的,當(dāng)然要定義JMSSERVER,FACTORY,DESTINATION。
           
           
           
           QueueSend
           
           import java.io.BufferedReader;
          import java.io.IOException;
          import java.io.InputStreamReader;
          import java.util.Hashtable;

          import javax.jms.JMSException;
          import javax.jms.Queue;
          import javax.jms.QueueConnection;
          import javax.jms.QueueConnectionFactory;
          import javax.jms.QueueSender;
          import javax.jms.QueueSession;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;

          public class QueueSend {
           // Defines the JNDI context factory.
           public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

           // Defines the JNDI provider url.
           public final static String PROVIDER_URL = " t3://localhost:7001";

           // Defines the JMS connection factory for the queue.
           public final static String JMS_FACTORY = "SendJMSFactory";

           // Defines the queue.
           public final static String QUEUE = "SendJMSQueue";

           private QueueConnectionFactory qconFactory;

           private QueueConnection qcon;

           private QueueSession qsession;

           private QueueSender qsender;

           private Queue queue;

           private TextMessage msg;

           /**
            * Creates all the necessary objects for sending messages to a JMS queue.
            *
            * @param ctx
            *            JNDI initial context
            * @param queueName
            *            name of queue
            * @exception NamingException
            *                if operation cannot be performed
            * @exception JMSException
            *                if JMS fails to initialize due to internal error
            */
           public void init(Context ctx, String queueName) throws NamingException,
             JMSException {
            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
            qcon = qconFactory.createQueueConnection();
            qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            queue = (Queue) ctx.lookup(queueName);
            qsender = qsession.createSender(queue);
            msg = qsession.createTextMessage();
            qcon.start();
           }

           /**
            * Sends a message to a JMS queue.
            *
            * @param message
            *            message to be sent
            * @exception JMSException
            *                if JMS fails to send message due to internal error
            */
           public void send(String message) throws JMSException {
            msg.setText(message);
            qsender.send(msg);
           }

           /**
            * Closes JMS objects.
            *
            * @exception JMSException
            *                if JMS fails to close objects due to internal error
            */
           public void close() throws JMSException {
            qsender.close();
            qsession.close();
            qcon.close();
           }

           /**
            * main() method.
            *
            * @param args
            *            WebLogic Server URL
            * @exception Exception
            *                if operation fails
            */
           public static void main(String[] args) throws Exception {
            try {
             InitialContext ic = getInitialContext();
             QueueSend qs = new QueueSend();
             qs.init(ic, QUEUE);
             readAndSend(qs);
             qs.close();
            } catch (Exception e) {
             e.printStackTrace();
            }
           }

           private static void readAndSend(QueueSend qs) throws IOException,
             JMSException {
            BufferedReader msgStream = new BufferedReader(new InputStreamReader(
              System.in));
            String line = null;
            boolean quitNow = false;
            do {
             System.out.print("Enter message (\"quit\" to quit): ");
             line = msgStream.readLine();
             if (line != null && line.trim().length() != 0) {
              qs.send(line);
              System.out.println("JMS Message Sent: " + line + "\n");
              quitNow = line.equalsIgnoreCase("quit");
             }
            } while (!quitNow);

           }

           private static InitialContext getInitialContext() throws NamingException {
            Hashtable env = new Hashtable();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
            env.put(Context.PROVIDER_URL, PROVIDER_URL);
            return new InitialContext(env);
           }

          }


          QueueReceive

          import java.util.Hashtable;

          import javax.jms.JMSException;
          import javax.jms.Message;
          import javax.jms.MessageListener;
          import javax.jms.Queue;
          import javax.jms.QueueConnection;
          import javax.jms.QueueConnectionFactory;
          import javax.jms.QueueReceiver;
          import javax.jms.QueueSession;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;

          public class QueueReceive implements MessageListener {
           // Defines the JNDI context factory.
           public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

           // Defines the JNDI provider url.
           public final static String PROVIDER_URL = " t3://localhost:7001";

           // Defines the JMS connection factory for the queue.
           public final static String JMS_FACTORY = "SendJMSFactory";

           // Defines the queue.
           public final static String QUEUE = "SendJMSQueue";

           private QueueConnectionFactory qconFactory;

           private QueueConnection qcon;

           private QueueSession qsession;

           private QueueReceiver qreceiver;

           private Queue queue;

           private boolean quit = false;

           /**
            * Message listener interface.
            *
            * @param msg
            *            message
            */
           public void onMessage(Message msg) {
            try {
             String msgText;
             if (msg instanceof TextMessage) {
              msgText = ((TextMessage) msg).getText();
             } else {
              msgText = msg.toString();
             }

             System.out.println("Message Received: " + msgText);

          //   if (msgText.equalsIgnoreCase("123")) {
          //    synchronized (this) {
          //     quit = true;
          //     this.notifyAll(); // Notify main thread to quit
          //    }
          //   }
            } catch (JMSException jmse) {
             jmse.printStackTrace();
            }
           }

           /**
            * Creates all the necessary objects for receiving messages from a JMS
            * queue.
            *
            * @param ctx
            *            JNDI initial context
            * @param queueName
            *            name of queue
            * @exception NamingException
            *                if operation cannot be performed
            * @exception JMSException
            *                if JMS fails to initialize due to internal error
            */
           public void init(Context ctx, String queueName) throws NamingException,
             JMSException {
            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
            qcon = qconFactory.createQueueConnection();
            qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            queue = (Queue) ctx.lookup(queueName);
            qreceiver = qsession.createReceiver(queue);
            qreceiver.setMessageListener(this);
            qcon.start();
           }

           /**
            * Closes JMS objects.
            *
            * @exception JMSException
            *                if JMS fails to close objects due to internal error
            */
           public void close() throws JMSException {
            qreceiver.close();
            qsession.close();
            qcon.close();
           }

           /**
            * main() method.
            *
            * @param args
            *            WebLogic Server URL
            * @exception Exception
            *                if execution fails
            */

           public static void main(String[] args) throws Exception {

            InitialContext ic = getInitialContext();
            QueueReceive qr = new QueueReceive();
            qr.init(ic, QUEUE);

            System.out
              .println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");

            // Wait until a "quit" message has been received.
            synchronized (qr) {
             System.out.println("111111111111");
             while (!qr.quit) {
              try {
               System.out.println("2222222222");
               qr.wait();
               System.out.println("333333333");
              } catch (InterruptedException ie) {
              }
             }
            }
            qr.close();
           }

           private static InitialContext getInitialContext() throws NamingException {
            Hashtable env = new Hashtable();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
            env.put(Context.PROVIDER_URL, PROVIDER_URL);
            return new InitialContext(env);
           }

          }

           

          TopicSend

          import java.io.BufferedReader;
          import java.io.IOException;
          import java.io.InputStreamReader;
          import java.util.Hashtable;

          import javax.jms.JMSException;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.jms.Topic;
          import javax.jms.TopicConnection;
          import javax.jms.TopicConnectionFactory;
          import javax.jms.TopicSession;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;
          import javax.jms.TopicPublisher;
          public class TopicSend {
           // Defines the JNDI context factory.
           public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

           // Defines the JNDI provider url.
           public final static String PROVIDER_URL = " t3://localhost:7001";

           // Defines the JMS connection factory for the queue.
           public final static String JMS_FACTORY = "SendJMSFactory";

           // Defines the queue.
           public final static String TOPIC = "SendJMSTopic";

           private TopicConnectionFactory tconFactory;

           private TopicConnection tcon;

           private TopicSession tsession;

           private TopicPublisher tsender;

           private Topic topic;

           private TextMessage msg;
           public static InitialContext ic ;

           /**
            * Creates all the necessary objects for sending messages to a JMS queue.
            *
            * @param ctx
            *            JNDI initial context
            * @param queueName
            *            name of queue
            * @exception NamingException
            *                if operation cannot be performed
            * @exception JMSException
            *                if JMS fails to initialize due to internal error
            */
           public void init(Context ctx, String queueName) throws NamingException,
             JMSException {
            tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
            tcon = tconFactory.createTopicConnection();
            tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            System.out.println(topic);
            topic = (Topic) ctx.lookup(queueName);
            System.out.println(topic);
            tsender = tsession.createPublisher(topic);
            msg = tsession.createTextMessage();
            tcon.start();
           }

           /**
            * Sends a message to a JMS queue.
            *
            * @param message
            *            message to be sent
            * @exception JMSException
            *                if JMS fails to send message due to internal error
            */
           public void send(String message) throws JMSException ,NamingException{
            System.out.println(topic+"-----------");
            msg.setText(message);
            tsender.publish(msg);
           }

           /**
            * Closes JMS objects.
            *
            * @exception JMSException
            *                if JMS fails to close objects due to internal error
            */
           public void close() throws JMSException {
            tsender.close();
            tsession.close();
            tcon.close();
           }

           /**
            * main() method.
            *
            * @param args
            *            WebLogic Server URL
            * @exception Exception
            *                if operation fails
            */
           public static void main(String[] args) throws Exception {
            try {
             ic= getInitialContext();
             TopicSend ts = new TopicSend();
             ts.init(ic, TOPIC);
             readAndSend(ts);
             ts.close();
            } catch (Exception e) {
             e.printStackTrace();
            }
           }

           private static void readAndSend(TopicSend ts) throws IOException,
             JMSException,NamingException {
            BufferedReader msgStream = new BufferedReader(new InputStreamReader(
              System.in));
            String line = null;
            boolean quitNow = false;
            do {
             System.out.print("Enter message (\"quit\" to quit): ");
             line = msgStream.readLine();
             if (line != null && line.trim().length() != 0) {
              ts.send(line);
             
              System.out.println("JMS Message Sent: " + line + "\n");
              quitNow = line.equalsIgnoreCase("quit");
             }
            } while (!quitNow);

           }

           private static InitialContext getInitialContext() throws NamingException {
            Hashtable env = new Hashtable();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
            env.put(Context.PROVIDER_URL, PROVIDER_URL);
            return new InitialContext(env);
           }

          }

           

          TopicReceive1

          import java.util.Hashtable;

          import javax.jms.JMSException;
          import javax.jms.Message;
          import javax.jms.MessageListener;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.jms.Topic;
          import javax.jms.TopicConnection;
          import javax.jms.TopicConnectionFactory;
          import javax.jms.TopicSession;
          import javax.jms.TopicSubscriber;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;


          public class TopicReceive1 implements MessageListener {
           // Defines the JNDI context factory.
           public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

           // Defines the JNDI provider url.
           public final static String PROVIDER_URL = " t3://localhost:7001";

           // Defines the JMS connection factory for the queue.
           public final static String JMS_FACTORY = "SendJMSFactory";

           // Defines the queue.
           public final static String TOPIC = "SendJMSTopic";

           private TopicConnectionFactory tconFactory;

           private TopicConnection tcon;

           private TopicSession tsession;

           private TopicSubscriber tsubscriber;

           private Topic topic;

           private boolean quit = false;

           /**
            * Message listener interface.
            *
            * @param msg
            *            message
            */
           public void onMessage(Message msg) {
            System.out.println("===================");
            try {
             String msgText;
             if (msg instanceof TextMessage) {
              msgText = ((TextMessage) msg).getText();
             } else {
              msgText = msg.toString();
             }

             System.out.println("Message Received: " + msgText);

          //   if (msgText.equalsIgnoreCase("123")) {
          //    synchronized (this) {
          //     quit = true;
          //     this.notifyAll(); // Notify main thread to quit
          //    }
          //   }
            } catch (JMSException jmse) {
             jmse.printStackTrace();
            }
           }

           /**
            * Creates all the necessary objects for receiving messages from a JMS
            * queue.
            *
            * @param ctx
            *            JNDI initial context
            * @param queueName
            *            name of queue
            * @exception NamingException
            *                if operation cannot be performed
            * @exception JMSException
            *                if JMS fails to initialize due to internal error
            */
           public void init(Context ctx, String queueName) throws NamingException,
             JMSException {
            tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
            tcon = tconFactory.createTopicConnection();
            tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            topic = (Topic) ctx.lookup(queueName);
            tsubscriber = tsession.createSubscriber(topic);
            //System.out.println("12");
            //Message msg = treceiver.receive();
            //msg.acknowledge();
            //tsubscriber = tsession.createSubscriber(topic);Message msg = tsubscriber.receive();msg.acknowledge();
            //System.out.println(msg);
            tsubscriber.setMessageListener(this);
            tcon.start();
           }

           /**
            * Closes JMS objects.
            *
            * @exception JMSException
            *                if JMS fails to close objects due to internal error
            */
           public void close() throws JMSException {
            tsubscriber.close();
            tsession.close();
            tcon.close();
           }

           /**
            * main() method.
            *
            * @param args
            *            WebLogic Server URL
            * @exception Exception
            *                if execution fails
            */

           public static void main(String[] args) throws Exception {

            InitialContext ic = getInitialContext();
            TopicReceive1 tr1 = new TopicReceive1();
            tr1.init(ic, TOPIC);

            System.out
              .println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");
            
            

            // Wait until a "quit" message has been received.
          //  synchronized (tr1) {
          //   System.out.println("111111111111");
          //   while (!tr1.quit) {
          //    try {
          //     System.out.println("2222222222");
          //     tr1.wait();
          //     System.out.println("333333333");
          //    } catch (InterruptedException ie) {
          //    }
          //   }
          //  }
            tr1.close();
           }

           private static InitialContext getInitialContext() throws NamingException {
            Hashtable env = new Hashtable();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
            env.put(Context.PROVIDER_URL, PROVIDER_URL);
            return new InitialContext(env);
           }

          }


          TopicReceive2


          import java.util.Hashtable;

          import javax.jms.JMSException;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.jms.Topic;
          import javax.jms.TopicConnection;
          import javax.jms.TopicConnectionFactory;
          import javax.jms.TopicSession;
          import javax.jms.TopicSubscriber;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;


          public class TopicReceive2 {
           // Defines the JNDI context factory.
           public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

           // Defines the JNDI provider url.
           public final static String PROVIDER_URL = " t3://localhost:7001";

           // Defines the JMS connection factory for the queue.
           public final static String JMS_FACTORY = "SendJMSFactory";

           // Defines the queue.
           public final static String TOPIC = "SendJMSTopic";

           private TopicConnectionFactory tconFactory;

           private TopicConnection tcon;

           private TopicSession tsession;

           private TopicSubscriber tsubscriber;

           private Topic topic;

           private boolean quit = false;


           /**
            * Creates all the necessary objects for receiving messages from a JMS
            * queue.
            *
            * @param ctx
            *            JNDI initial context
            * @param queueName
            *            name of queue
            * @exception NamingException
            *                if operation cannot be performed
            * @exception JMSException
            *                if JMS fails to initialize due to internal error
            */
           public void init(Context ctx, String queueName) throws NamingException,
             JMSException,InterruptedException {
            tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
            tcon = tconFactory.createTopicConnection();
            tcon.setClientID("IP10.200.7.104");
            tcon.start();
            
            tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            System.out.println("333333333333");
            topic = (Topic) ctx.lookup(queueName);
            //tsubscriber = tsession.createSubscriber(topic);
            tsubscriber= tsession.createDurableSubscriber(topic,"88888");
             for (int i=0; i<3; i++) {
                       //
                       TextMessage message = (TextMessage) tsubscriber.receive();
                       System.out.println("message["+i+"]: " + message.getText());
                   }
                    Thread.sleep(10000);
            //System.out.println("12");
            //Message msg = treceiver.receive();
            //msg.acknowledge();
            //tsubscriber = tsession.createSubscriber(topic);Message msg = tsubscriber.receive();msg.acknowledge();
            //System.out.println(msg);

           }

           /**
            * Closes JMS objects.
            *
            * @exception JMSException
            *                if JMS fails to close objects due to internal error
            */
           public void close() throws JMSException {
            tsubscriber.close();
            tsession.close();
            tcon.close();
           }

           /**
            * main() method.
            *
            * @param args
            *            WebLogic Server URL
            * @exception Exception
            *                if execution fails
            */

           public static void main(String[] args) throws Exception {

            InitialContext ic = getInitialContext();
            TopicReceive2 tr2 = new TopicReceive2();
            tr2.init(ic, TOPIC);

            System.out
              .println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");
            
            

            // Wait until a "quit" message has been received.
          //  synchronized (tr1) {
          //   System.out.println("111111111111");
          //   while (!tr1.quit) {
          //    try {
          //     System.out.println("2222222222");
          //     tr1.wait();
          //     System.out.println("333333333");
          //    } catch (InterruptedException ie) {
          //    }
          //   }
          //  }
            tr2.close();
           }

           private static InitialContext getInitialContext() throws NamingException {
            Hashtable env = new Hashtable();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
            env.put(Context.PROVIDER_URL, PROVIDER_URL);
            return new InitialContext(env);
           }

          }

          posted on 2007-08-15 09:20 小小程序程序員混口飯吃 閱讀(1174) 評(píng)論(0)  編輯  收藏 所屬分類: java
          主站蜘蛛池模板: 定西市| 双城市| 贵定县| 离岛区| 微博| 金华市| 临清市| 恩平市| 黑龙江省| 宁陵县| 邵东县| 肥西县| 济南市| 阿克苏市| 易门县| 锦屏县| 察雅县| 伊宁县| 高密市| 德安县| 兴隆县| 宁南县| 永清县| 永福县| 积石山| 古蔺县| 朝阳县| 徐闻县| 清苑县| 瑞安市| 潜江市| 西宁市| 巨鹿县| 广昌县| 璧山县| 扎囊县| 罗甸县| 万源市| 襄樊市| 安庆市| 韶山市|