隨筆-109  評論-187  文章-25  trackbacks-0

          由于一些原因,最近要復習一些東西,所以就把JMS的東西再復習一遍,以下便是例子

          jms 中最重要的幾個概念destination,ACKNOWLEDGE,subscribe,durable subscribe

          destination:topic queque
            queue簡單點說就是1:1 一個消息只能由一個consumer去消費,別的consumer來消費的時候已經沒了,先到先得
          topic簡單點說就是1:N 一個消息可以由多個consumer來消費,誰來消費都有
           subscribe,拿topic來說如果當前訂閱不是持久訂閱,只有再訂閱后生產者生產得消息才能被consumer得到,持久訂閱只要沒有被consumer消費,早晚會消費這個消息
           
           
           
           一下是幾個例子
           
           queuesend:queque消息產生
           queuereceive:queque消息得消費
           topicsend :topic消息得產生
           topicreceive1:topic消息的非訂閱
           topicrecieve2:topic消息的持久訂閱
           
           這個例子實在WEBLOGIC814上測試過的,當然要定義JMSSERVER,FACTORY,DESTINATION。
           
           
           
           QueueSend
           
           import java.io.BufferedReader;
          import java.io.IOException;
          import java.io.InputStreamReader;
          import java.util.Hashtable;

          import javax.jms.JMSException;
          import javax.jms.Queue;
          import javax.jms.QueueConnection;
          import javax.jms.QueueConnectionFactory;
          import javax.jms.QueueSender;
          import javax.jms.QueueSession;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;

          public class QueueSend {
           // Defines the JNDI context factory.
           public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

           // Defines the JNDI provider url.
           public final static String PROVIDER_URL = " t3://localhost:7001";

           // Defines the JMS connection factory for the queue.
           public final static String JMS_FACTORY = "SendJMSFactory";

           // Defines the queue.
           public final static String QUEUE = "SendJMSQueue";

           private QueueConnectionFactory qconFactory;

           private QueueConnection qcon;

           private QueueSession qsession;

           private QueueSender qsender;

           private Queue queue;

           private TextMessage msg;

           /**
            * Creates all the necessary objects for sending messages to a JMS queue.
            *
            * @param ctx
            *            JNDI initial context
            * @param queueName
            *            name of queue
            * @exception NamingException
            *                if operation cannot be performed
            * @exception JMSException
            *                if JMS fails to initialize due to internal error
            */
           public void init(Context ctx, String queueName) throws NamingException,
             JMSException {
            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
            qcon = qconFactory.createQueueConnection();
            qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            queue = (Queue) ctx.lookup(queueName);
            qsender = qsession.createSender(queue);
            msg = qsession.createTextMessage();
            qcon.start();
           }

           /**
            * Sends a message to a JMS queue.
            *
            * @param message
            *            message to be sent
            * @exception JMSException
            *                if JMS fails to send message due to internal error
            */
           public void send(String message) throws JMSException {
            msg.setText(message);
            qsender.send(msg);
           }

           /**
            * Closes JMS objects.
            *
            * @exception JMSException
            *                if JMS fails to close objects due to internal error
            */
           public void close() throws JMSException {
            qsender.close();
            qsession.close();
            qcon.close();
           }

           /**
            * main() method.
            *
            * @param args
            *            WebLogic Server URL
            * @exception Exception
            *                if operation fails
            */
           public static void main(String[] args) throws Exception {
            try {
             InitialContext ic = getInitialContext();
             QueueSend qs = new QueueSend();
             qs.init(ic, QUEUE);
             readAndSend(qs);
             qs.close();
            } catch (Exception e) {
             e.printStackTrace();
            }
           }

           private static void readAndSend(QueueSend qs) throws IOException,
             JMSException {
            BufferedReader msgStream = new BufferedReader(new InputStreamReader(
              System.in));
            String line = null;
            boolean quitNow = false;
            do {
             System.out.print("Enter message (\"quit\" to quit): ");
             line = msgStream.readLine();
             if (line != null && line.trim().length() != 0) {
              qs.send(line);
              System.out.println("JMS Message Sent: " + line + "\n");
              quitNow = line.equalsIgnoreCase("quit");
             }
            } while (!quitNow);

           }

           private static InitialContext getInitialContext() throws NamingException {
            Hashtable env = new Hashtable();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
            env.put(Context.PROVIDER_URL, PROVIDER_URL);
            return new InitialContext(env);
           }

          }


          QueueReceive

          import java.util.Hashtable;

          import javax.jms.JMSException;
          import javax.jms.Message;
          import javax.jms.MessageListener;
          import javax.jms.Queue;
          import javax.jms.QueueConnection;
          import javax.jms.QueueConnectionFactory;
          import javax.jms.QueueReceiver;
          import javax.jms.QueueSession;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;

          public class QueueReceive implements MessageListener {
           // Defines the JNDI context factory.
           public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

           // Defines the JNDI provider url.
           public final static String PROVIDER_URL = " t3://localhost:7001";

           // Defines the JMS connection factory for the queue.
           public final static String JMS_FACTORY = "SendJMSFactory";

           // Defines the queue.
           public final static String QUEUE = "SendJMSQueue";

           private QueueConnectionFactory qconFactory;

           private QueueConnection qcon;

           private QueueSession qsession;

           private QueueReceiver qreceiver;

           private Queue queue;

           private boolean quit = false;

           /**
            * Message listener interface.
            *
            * @param msg
            *            message
            */
           public void onMessage(Message msg) {
            try {
             String msgText;
             if (msg instanceof TextMessage) {
              msgText = ((TextMessage) msg).getText();
             } else {
              msgText = msg.toString();
             }

             System.out.println("Message Received: " + msgText);

          //   if (msgText.equalsIgnoreCase("123")) {
          //    synchronized (this) {
          //     quit = true;
          //     this.notifyAll(); // Notify main thread to quit
          //    }
          //   }
            } catch (JMSException jmse) {
             jmse.printStackTrace();
            }
           }

           /**
            * Creates all the necessary objects for receiving messages from a JMS
            * queue.
            *
            * @param ctx
            *            JNDI initial context
            * @param queueName
            *            name of queue
            * @exception NamingException
            *                if operation cannot be performed
            * @exception JMSException
            *                if JMS fails to initialize due to internal error
            */
           public void init(Context ctx, String queueName) throws NamingException,
             JMSException {
            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
            qcon = qconFactory.createQueueConnection();
            qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            queue = (Queue) ctx.lookup(queueName);
            qreceiver = qsession.createReceiver(queue);
            qreceiver.setMessageListener(this);
            qcon.start();
           }

           /**
            * Closes JMS objects.
            *
            * @exception JMSException
            *                if JMS fails to close objects due to internal error
            */
           public void close() throws JMSException {
            qreceiver.close();
            qsession.close();
            qcon.close();
           }

           /**
            * main() method.
            *
            * @param args
            *            WebLogic Server URL
            * @exception Exception
            *                if execution fails
            */

           public static void main(String[] args) throws Exception {

            InitialContext ic = getInitialContext();
            QueueReceive qr = new QueueReceive();
            qr.init(ic, QUEUE);

            System.out
              .println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");

            // Wait until a "quit" message has been received.
            synchronized (qr) {
             System.out.println("111111111111");
             while (!qr.quit) {
              try {
               System.out.println("2222222222");
               qr.wait();
               System.out.println("333333333");
              } catch (InterruptedException ie) {
              }
             }
            }
            qr.close();
           }

           private static InitialContext getInitialContext() throws NamingException {
            Hashtable env = new Hashtable();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
            env.put(Context.PROVIDER_URL, PROVIDER_URL);
            return new InitialContext(env);
           }

          }

           

          TopicSend

          import java.io.BufferedReader;
          import java.io.IOException;
          import java.io.InputStreamReader;
          import java.util.Hashtable;

          import javax.jms.JMSException;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.jms.Topic;
          import javax.jms.TopicConnection;
          import javax.jms.TopicConnectionFactory;
          import javax.jms.TopicSession;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;
          import javax.jms.TopicPublisher;
          public class TopicSend {
           // Defines the JNDI context factory.
           public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

           // Defines the JNDI provider url.
           public final static String PROVIDER_URL = " t3://localhost:7001";

           // Defines the JMS connection factory for the queue.
           public final static String JMS_FACTORY = "SendJMSFactory";

           // Defines the queue.
           public final static String TOPIC = "SendJMSTopic";

           private TopicConnectionFactory tconFactory;

           private TopicConnection tcon;

           private TopicSession tsession;

           private TopicPublisher tsender;

           private Topic topic;

           private TextMessage msg;
           public static InitialContext ic ;

           /**
            * Creates all the necessary objects for sending messages to a JMS queue.
            *
            * @param ctx
            *            JNDI initial context
            * @param queueName
            *            name of queue
            * @exception NamingException
            *                if operation cannot be performed
            * @exception JMSException
            *                if JMS fails to initialize due to internal error
            */
           public void init(Context ctx, String queueName) throws NamingException,
             JMSException {
            tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
            tcon = tconFactory.createTopicConnection();
            tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            System.out.println(topic);
            topic = (Topic) ctx.lookup(queueName);
            System.out.println(topic);
            tsender = tsession.createPublisher(topic);
            msg = tsession.createTextMessage();
            tcon.start();
           }

           /**
            * Sends a message to a JMS queue.
            *
            * @param message
            *            message to be sent
            * @exception JMSException
            *                if JMS fails to send message due to internal error
            */
           public void send(String message) throws JMSException ,NamingException{
            System.out.println(topic+"-----------");
            msg.setText(message);
            tsender.publish(msg);
           }

           /**
            * Closes JMS objects.
            *
            * @exception JMSException
            *                if JMS fails to close objects due to internal error
            */
           public void close() throws JMSException {
            tsender.close();
            tsession.close();
            tcon.close();
           }

           /**
            * main() method.
            *
            * @param args
            *            WebLogic Server URL
            * @exception Exception
            *                if operation fails
            */
           public static void main(String[] args) throws Exception {
            try {
             ic= getInitialContext();
             TopicSend ts = new TopicSend();
             ts.init(ic, TOPIC);
             readAndSend(ts);
             ts.close();
            } catch (Exception e) {
             e.printStackTrace();
            }
           }

           private static void readAndSend(TopicSend ts) throws IOException,
             JMSException,NamingException {
            BufferedReader msgStream = new BufferedReader(new InputStreamReader(
              System.in));
            String line = null;
            boolean quitNow = false;
            do {
             System.out.print("Enter message (\"quit\" to quit): ");
             line = msgStream.readLine();
             if (line != null && line.trim().length() != 0) {
              ts.send(line);
             
              System.out.println("JMS Message Sent: " + line + "\n");
              quitNow = line.equalsIgnoreCase("quit");
             }
            } while (!quitNow);

           }

           private static InitialContext getInitialContext() throws NamingException {
            Hashtable env = new Hashtable();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
            env.put(Context.PROVIDER_URL, PROVIDER_URL);
            return new InitialContext(env);
           }

          }

           

          TopicReceive1

          import java.util.Hashtable;

          import javax.jms.JMSException;
          import javax.jms.Message;
          import javax.jms.MessageListener;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.jms.Topic;
          import javax.jms.TopicConnection;
          import javax.jms.TopicConnectionFactory;
          import javax.jms.TopicSession;
          import javax.jms.TopicSubscriber;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;


          public class TopicReceive1 implements MessageListener {
           // Defines the JNDI context factory.
           public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

           // Defines the JNDI provider url.
           public final static String PROVIDER_URL = " t3://localhost:7001";

           // Defines the JMS connection factory for the queue.
           public final static String JMS_FACTORY = "SendJMSFactory";

           // Defines the queue.
           public final static String TOPIC = "SendJMSTopic";

           private TopicConnectionFactory tconFactory;

           private TopicConnection tcon;

           private TopicSession tsession;

           private TopicSubscriber tsubscriber;

           private Topic topic;

           private boolean quit = false;

           /**
            * Message listener interface.
            *
            * @param msg
            *            message
            */
           public void onMessage(Message msg) {
            System.out.println("===================");
            try {
             String msgText;
             if (msg instanceof TextMessage) {
              msgText = ((TextMessage) msg).getText();
             } else {
              msgText = msg.toString();
             }

             System.out.println("Message Received: " + msgText);

          //   if (msgText.equalsIgnoreCase("123")) {
          //    synchronized (this) {
          //     quit = true;
          //     this.notifyAll(); // Notify main thread to quit
          //    }
          //   }
            } catch (JMSException jmse) {
             jmse.printStackTrace();
            }
           }

           /**
            * Creates all the necessary objects for receiving messages from a JMS
            * queue.
            *
            * @param ctx
            *            JNDI initial context
            * @param queueName
            *            name of queue
            * @exception NamingException
            *                if operation cannot be performed
            * @exception JMSException
            *                if JMS fails to initialize due to internal error
            */
           public void init(Context ctx, String queueName) throws NamingException,
             JMSException {
            tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
            tcon = tconFactory.createTopicConnection();
            tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            topic = (Topic) ctx.lookup(queueName);
            tsubscriber = tsession.createSubscriber(topic);
            //System.out.println("12");
            //Message msg = treceiver.receive();
            //msg.acknowledge();
            //tsubscriber = tsession.createSubscriber(topic);Message msg = tsubscriber.receive();msg.acknowledge();
            //System.out.println(msg);
            tsubscriber.setMessageListener(this);
            tcon.start();
           }

           /**
            * Closes JMS objects.
            *
            * @exception JMSException
            *                if JMS fails to close objects due to internal error
            */
           public void close() throws JMSException {
            tsubscriber.close();
            tsession.close();
            tcon.close();
           }

           /**
            * main() method.
            *
            * @param args
            *            WebLogic Server URL
            * @exception Exception
            *                if execution fails
            */

           public static void main(String[] args) throws Exception {

            InitialContext ic = getInitialContext();
            TopicReceive1 tr1 = new TopicReceive1();
            tr1.init(ic, TOPIC);

            System.out
              .println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");
            
            

            // Wait until a "quit" message has been received.
          //  synchronized (tr1) {
          //   System.out.println("111111111111");
          //   while (!tr1.quit) {
          //    try {
          //     System.out.println("2222222222");
          //     tr1.wait();
          //     System.out.println("333333333");
          //    } catch (InterruptedException ie) {
          //    }
          //   }
          //  }
            tr1.close();
           }

           private static InitialContext getInitialContext() throws NamingException {
            Hashtable env = new Hashtable();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
            env.put(Context.PROVIDER_URL, PROVIDER_URL);
            return new InitialContext(env);
           }

          }


          TopicReceive2


          import java.util.Hashtable;

          import javax.jms.JMSException;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.jms.Topic;
          import javax.jms.TopicConnection;
          import javax.jms.TopicConnectionFactory;
          import javax.jms.TopicSession;
          import javax.jms.TopicSubscriber;
          import javax.naming.Context;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;


          public class TopicReceive2 {
           // Defines the JNDI context factory.
           public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

           // Defines the JNDI provider url.
           public final static String PROVIDER_URL = " t3://localhost:7001";

           // Defines the JMS connection factory for the queue.
           public final static String JMS_FACTORY = "SendJMSFactory";

           // Defines the queue.
           public final static String TOPIC = "SendJMSTopic";

           private TopicConnectionFactory tconFactory;

           private TopicConnection tcon;

           private TopicSession tsession;

           private TopicSubscriber tsubscriber;

           private Topic topic;

           private boolean quit = false;


           /**
            * Creates all the necessary objects for receiving messages from a JMS
            * queue.
            *
            * @param ctx
            *            JNDI initial context
            * @param queueName
            *            name of queue
            * @exception NamingException
            *                if operation cannot be performed
            * @exception JMSException
            *                if JMS fails to initialize due to internal error
            */
           public void init(Context ctx, String queueName) throws NamingException,
             JMSException,InterruptedException {
            tconFactory = (TopicConnectionFactory) ctx.lookup(JMS_FACTORY);
            tcon = tconFactory.createTopicConnection();
            tcon.setClientID("IP10.200.7.104");
            tcon.start();
            
            tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            System.out.println("333333333333");
            topic = (Topic) ctx.lookup(queueName);
            //tsubscriber = tsession.createSubscriber(topic);
            tsubscriber= tsession.createDurableSubscriber(topic,"88888");
             for (int i=0; i<3; i++) {
                       //
                       TextMessage message = (TextMessage) tsubscriber.receive();
                       System.out.println("message["+i+"]: " + message.getText());
                   }
                    Thread.sleep(10000);
            //System.out.println("12");
            //Message msg = treceiver.receive();
            //msg.acknowledge();
            //tsubscriber = tsession.createSubscriber(topic);Message msg = tsubscriber.receive();msg.acknowledge();
            //System.out.println(msg);

           }

           /**
            * Closes JMS objects.
            *
            * @exception JMSException
            *                if JMS fails to close objects due to internal error
            */
           public void close() throws JMSException {
            tsubscriber.close();
            tsession.close();
            tcon.close();
           }

           /**
            * main() method.
            *
            * @param args
            *            WebLogic Server URL
            * @exception Exception
            *                if execution fails
            */

           public static void main(String[] args) throws Exception {

            InitialContext ic = getInitialContext();
            TopicReceive2 tr2 = new TopicReceive2();
            tr2.init(ic, TOPIC);

            System.out
              .println("JMS Ready To Receive Messages (To quit, send a \"quit\" message).");
            
            

            // Wait until a "quit" message has been received.
          //  synchronized (tr1) {
          //   System.out.println("111111111111");
          //   while (!tr1.quit) {
          //    try {
          //     System.out.println("2222222222");
          //     tr1.wait();
          //     System.out.println("333333333");
          //    } catch (InterruptedException ie) {
          //    }
          //   }
          //  }
            tr2.close();
           }

           private static InitialContext getInitialContext() throws NamingException {
            Hashtable env = new Hashtable();
            env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
            env.put(Context.PROVIDER_URL, PROVIDER_URL);
            return new InitialContext(env);
           }

          }

          posted on 2007-08-15 09:20 小小程序程序員混口飯吃 閱讀(1175) 評論(0)  編輯  收藏 所屬分類: java
          主站蜘蛛池模板: 高清| 贵溪市| 贺兰县| 沽源县| 雅江县| 通州区| 彰武县| 柳江县| 高雄市| 普洱| 扎赉特旗| 唐山市| 关岭| 榆中县| 宣汉县| 贡嘎县| 剑川县| 玉溪市| 连平县| 茶陵县| 钟山县| 黑龙江省| 阿尔山市| 阳江市| 互助| 长垣县| 靖边县| 清远市| 江孜县| 元氏县| 南昌县| 榕江县| 宁河县| 井冈山市| 台北县| 木里| 锡林浩特市| 汶上县| 柘城县| 金山区| 张北县|