軟件藝術思考者  
          混沌,彷徨,立志,蓄勢...
          公告
          日歷
          <2025年5月>
          27282930123
          45678910
          11121314151617
          18192021222324
          25262728293031
          1234567

          導航

          隨筆分類(86)

          隨筆檔案(85)

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

           
           首 先可以到網站上下載最新的openjms版本,然后啟動bin下的startup.bat就啟動服務了,然后可以啟動admin.bat進入管理界面(非 必要步驟),然后分別運行samples下的basic例子就可以看到效果了,當然以前有篇文章介紹的不錯,可以參考一下,不過時間上比較老了,做為參考 還是可以的。

          本文介紹開源的JMS服務器openJms,及怎樣使用openJms來構建系統之間健全、高度可用的通訊,從而簡化企業級應用的開發。 openJms符合SUN的JMS API 1.0.2規范,支持消息隊列,還支持消息傳遞的發布/訂閱模式,本文先就系統服務的搭建及JMS的非結構化消息發送和接收進行說明。

          JMS 有五種消息類型。三種結構化或半結構化的消息類型(MapMessage、ObjectMessage 和 StreamMessage)以及兩種非結構化的或自由格式的消息類型(TextMessage 和 BytesMessage)。而這里雖然我們只對非結構化消息進行說明,但非結構化的消息格式卻能夠更好地進行交互操作,因為它們在消息上很少利用結構, 在此基礎上與XML再進行結合,將能方便的進行更好的擴展,XML相關簡化操作參考《Jaxb來實現Java-XML的轉換》。

          下面具體來介紹服務器搭建,在http://openjms.sourceforge.net/downloads.html下載openJms,解壓后 可以直接使用,在 "openjms-0.7.6.1"bin 里,有openJms的運行腳本,執行 startup 啟動,彈出一個新的窗口,服務就運行在新窗口內,shutdown 為停止命令:

          服務運行后,就可以開始使用JMS服務了,至此服務搭建完畢,簡單得不能再簡單了。

          下面是消息發送和接收的開發,開發中需要的jar包在"openjms-0.7.6.1"lib里可以找到:
          openjms-0.7.6.1.jar
          jms-1.0.2a.jar
          exolabcore-0.3.7.jar
          commons-logging-1.0.3.jar

          把上面的類包加入到項目中,下面是消息發送服務的代碼:

          package javayou.demo.openjms;
          import java.util.*;
          import javax.jms.*;
          import javax.naming.*;

          public class QueueSend {
              public static void main(String[] args) {
                  try {
                      //取得JNDI上下文和連接
                      Hashtable properties = new Hashtable();
                      properties.put(
                          Context.INITIAL_CONTEXT_FACTORY,
                          "org.exolab.jms.jndi.InitialContextFactory");
                      //openJms默認的端口是1099
                      properties.put(Context.PROVIDER_URL,
                           "rmi://localhost:1099/");
                      Context context = new InitialContext(properties);

                      //獲得JMS信息連接隊列工廠
                      QueueConnectionFactory queueConnectionFactory =
                          (QueueConnectionFactory) context.lookup(
                              "JmsQueueConnectionFactory");
                      //獲得JMS信息連接隊列
                      QueueConnection queueConnection =
                          queueConnectionFactory.createQueueConnection();
                      //產生隊列Session,設置事務為false,自動應答消息接收
                      QueueSession queueSession =
                          queueConnection.createQueueSession(
                              false,
                              Session.AUTO_ACKNOWLEDGE);

                      //獲得默認內建在JMS里的隊列之一:queue1
                      Queue queue = (Queue) context.lookup("queue1");
                      //產生JMS隊列發送器
                      QueueSender queueSender =
                          queueSession.createSender(queue);
                      //發送數據到JMS
                      TextMessage message = queueSession.createTextMessage();
                      message.setText("Hello, I'm openJms.");
                      queueSender.send(message);

                      System.out.println(
                          ""信息寫入JMS服務器隊列");

                      //以下做清除工作,代碼略
                      // ... ...
                                
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }

          執行程序發送消息,然后打開JMS控制臺,用 admin 命令啟動管理平臺,點擊菜單Actions-Connections-online,出現界面如下:

          可以看到JSM默認的隊列queue1里已經有1條消息了,而其他的隊列還是空著的。

          下面我們來看看消息接收服務的代碼:

          package javayou.demo.openjms;
          import java.util.*;
          import javax.jms.*;
          import javax.naming.*;
          public class QueueReceiveSynchronous {
              public static void main(String[] args) {
                  try {
                      //取得JNDI上下文和連接
                      Hashtable properties = new Hashtable();
                      properties.put(
                          Context.INITIAL_CONTEXT_FACTORY,
                          "org.exolab.jms.jndi.InitialContextFactory");
                      properties.put(Context.PROVIDER_URL,
                          "rmi://localhost:1099/");
                      Context context = new InitialContext(properties);

                      //獲得JMS信息連接隊列工廠
                      QueueConnectionFactory queueConnectionFactory =
                          (QueueConnectionFactory) context.lookup(
                              "JmsQueueConnectionFactory");

                      //獲得JMS信息連接隊列
                      QueueConnection queueConnection =
                          queueConnectionFactory.createQueueConnection();

                      //啟動接收隊列線程
                      queueConnection.start();
                      //產生隊列Session,設置事務為false,自動應答消息接收
                      QueueSession queueSession =
                          queueConnection.createQueueSession(
                              false,
                              Session.AUTO_ACKNOWLEDGE);
                      //獲得默認內建在JMS里的隊列之一:queue1
                      Queue queue = (Queue) context.lookup("queue1");
                      //產生JMS隊列接收器
                      QueueReceiver queueReceiver =
                          queueSession.createReceiver(queue);
                      //通過同步的方法接收消息
                      Message message = queueReceiver.receive();
                      String messageText = null;
                      if (message instanceof TextMessage)
                          messageText = ((TextMessage) message).                        
                              getText();
                      System.out.println(messageText);
                      //以下做清除工作,代碼略
                      // ... ...
                    
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }

          編譯后運行接收信息服務,可以看到接收到并打印之前發送的消息,再看看控制臺,發現queue1的消息隊列變為0,消息已被讀取,消息發送和接收到此結束。



          上篇openJms介紹 (一) 提到了openJms的構建及消息的發送和接收,這篇主要了解消息的發布和訂閱。JMS 的發布/訂閱模型定義了如何向一個內容節點發布和訂閱消息,內容節點也叫主題(topic),主題是為發布者(publisher)和訂閱者 (subscribe) 提供傳輸的中介。發布/訂閱模型使發布者和訂閱者之間不需要直接通訊(如RMI)就可保證消息的傳送,有效解決系統間耦合問題(當然有這個需要才行),還 有就是提供了一對一、一對多的通訊方式,比較靈活。

          先介紹JMS里2個概念,持久訂閱模式和非持久訂閱模式,其實也是發布/訂閱模型在可靠性上提供的2種方式:

          非持久訂閱模式:只有當客戶端處于激活狀態,也就是和JMS 服務器保持連接的狀態下,才能接收到發送到某個Topic的消息,而當客戶端處于離線狀態時,則這個時間段發到Topic的消息將會永遠接收不到。

          持久訂閱模式:客戶端向JMS 注冊一個識別自己身份的ID,當這個客戶端處于離線時,JMS 服務器會為這個ID 保存所有發送到主題的消息,當客戶再次連接到JMS 服務器時,會根據自己的ID 得到所有當自己處于離線時發送到主題的消息,即消息永遠能接收到。

          下面我們就接著來看openJms在發布/訂閱模式上的表現,由于篇幅關系,在這里只講述非持久訂閱模式,持久訂閱模式可以根據JMS的標準來試。

          消息發布的代碼如下:

          package javayou.demo.openjms;
          import java.util.*;
          import javax.jms.*;
          import javax.naming.*;
          public class TopicPublish {
              public static void main(String[] args) {
                  try {
                      //取得JNDI上下文和連接
                      Hashtable properties = new Hashtable();
                      properties.put(
                          Context.INITIAL_CONTEXT_FACTORY,
                          "org.exolab.jms.jndi.InitialContextFactory");
                      //openJms默認的端口是1099
                      properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
                      Context context = new InitialContext(properties);
                      //獲得JMS Topic連接隊列工廠
                      TopicConnectionFactory factory =
                          (TopicConnectionFactory) context.lookup(
                              "JmsTopicConnectionFactory");

                      //創建一個Topic連接,并啟動
                      TopicConnection topicConnection = factory.createTopicConnection();
                      topicConnection.start();

                      //創建一個Topic會話,并設置自動應答
                      TopicSession topicSession =
                          topicConnection.createTopicSession(false,
                                  Session.AUTO_ACKNOWLEDGE);

                      //lookup 得到 topic1
                      Topic topic = (Topic) context.lookup("topic1");
                      //用Topic會話生成Topic發布器
                      TopicPublisher topicPublisher = topicSession.createPublisher(topic);

                      //發布消息到Topic
                      System.out.println("消息發布到Topic");
                      TextMessage message = topicSession.createTextMessage
                          ("你好,歡迎定購Topic類消息");
                      topicPublisher.publish(message);

                      //資源清除,代碼略 ... ...   
                  } catch (NamingException e) {
                      e.printStackTrace();
                  } catch (JMSException e) {
                      e.printStackTrace();
                  }
              }
          }

          而訂閱消息的接收有同步的和異步2種,他們分別使用receive()和onMessage(Message message)方法來接收消息,具體代碼:

          同步接收:

          package javayou.demo.openjms;
          import java.util.*;
          import javax.jms.*;
          import javax.naming.*;
          public class TopicSubscribeSynchronous {

              public static void main(String[] args) {
                  try {
                      System.out.println("定購消息接收啟動:");
                      //取得JNDI上下文和連接
                      Hashtable properties = new Hashtable();
                      properties.put(Context.INITIAL_CONTEXT_FACTORY,
                          "org.exolab.jms.jndi.InitialContextFactory");
                      properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
                      Context context = new InitialContext(properties);

                      //獲得Topic工廠和Connection
                      TopicConnectionFactory factory =
                          (TopicConnectionFactory) context.lookup(
                              "JmsTopicConnectionFactory");
                      TopicConnection topicConnection = factory.createTopicConnection();
                      topicConnection.start();

                      //創建Topic的會話,用于接收信息
                      TopicSession topicSession =
                          topicConnection.createTopicSession(
                              false,
                              Session.AUTO_ACKNOWLEDGE);

                      //lookup topic1
                      Topic topic = (Topic) context.lookup("topic1");
                              //創建Topic subscriber
                      TopicSubscriber topicSubscriber =
                          topicSession.createSubscriber(topic);
                      //收滿10條訂閱消息則退出
                      for (int i=0; i<10; i++) {
                          //同步消息接收,使用receive方法,堵塞等待,直到接收消息
                          TextMessage message = (TextMessage) topicSubscriber.receive();
                          System.out.println("接收訂閱消息["+i+"]: " + message.getText());
                      }
                      //資源清除,代碼略 ... ...
                      System.out.println("訂閱接收結束.");
                  } catch (NamingException e) {
                      e.printStackTrace();
                  } catch (JMSException e) {
                      e.printStackTrace();
                  }
              }
          }

          非同步接收:

          package javayou.demo.openjms;
          import java.util.*;
          import javax.jms.*;
          import javax.naming.*;
          public class TopicSubscribeAsynchronous implements MessageListener {
              private TopicConnection topicConnection;
              private TopicSession topicSession;
              private Topic topic;
              private TopicSubscriber topicSubscriber;

              TopicSubscribeAsynchronous() {
                  try {
                      //取得JNDI上下文和連接
                      Hashtable properties = new Hashtable();
                      properties.put(
                          Context.INITIAL_CONTEXT_FACTORY,
                          "org.exolab.jms.jndi.InitialContextFactory");
                      properties.put(Context.PROVIDER_URL, "rmi://localhost:1099/");
                      Context context = new InitialContext(properties);

                      //取得Topic的連接工廠和連接
                      TopicConnectionFactory topicConnectionFactory =
                          (TopicConnectionFactory) context.lookup(
                              "JmsTopicConnectionFactory");
                      topicConnection = topicConnectionFactory.createTopicConnection();

                      //創建Topic的會話,用于接收信息
                      topicSession =
                          topicConnection.createTopicSession(false,
                              Session.AUTO_ACKNOWLEDGE);
                      topic = (Topic) context.lookup("topic1");

                      //創建Topic subscriber
                      topicSubscriber = topicSession.createSubscriber(topic);
                      //設置訂閱監聽
                      topicSubscriber.setMessageListener(this);

                      //啟動信息接收
                      topicConnection.start();
                  } catch (NamingException e) {
                      e.printStackTrace();
                  } catch (JMSException e) {
                      e.printStackTrace();
                  }
              }

              public static void main(String[] args) {
                  System.out.println("非同步定購消息的接收:");
                  try {
                      TopicSubscribeAsynchronous listener =
                          new TopicSubscribeAsynchronous();
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }

              //收到訂閱信息后自動調用此方法
              public void onMessage(Message message) {
                  try {
                      String messageText = null;
                      if (message instanceof TextMessage)
                          messageText = ((TextMessage) message).getText();
                      System.out.println(messageText);
                  } catch (JMSException e) {
                      e.printStackTrace();
                  }
              }
          }

          編譯好后,啟動openJms服務,打開admin管理臺,為了運行方便,這里先列出三個類的運行命令:
          java -cp ."; -Djava.ext.dirs=."lib; javayou.demo.openjms.TopicPublish
          java -cp ."; -Djava.ext.dirs=."lib; javayou.demo.openjms.TopicSubscribeSynchronous
          java -cp ."; -Djava.ext.dirs=."lib; javayou.demo.openjms.TopicSubscribeAsynchronous

          先運行2個接收命令,再運行發布命令,可以看到控制臺的Topic有消息接收,并且接收1和2都有消息接收的提示,到此完成演示,由于是非持久訂閱,所以可以看到控制臺上的Topic消息條數不會減少。
          posted on 2009-09-08 11:25 智者無疆 閱讀(362) 評論(1)  編輯  收藏 所屬分類: about java
          評論:
          • # re: JMS服務器openJms入門  zhanglijun Posted @ 2009-12-22 14:21
            fileUtil downloadfile
            public ActionForward downLoadTSFile(ActionMapping mapping, ActionForm form,
            HttpServletRequest request, HttpServletResponse response) {
            String fileId = request.getParameter("fileId");
            String fileName = "";
            //找到下載文件的物理路徑
            String fullPath = "/media/HSKBoss/software/";
            /*1.文件閱讀器
            2.24h分析軟件
            3.加密狗
            4.rar安裝文件*/
            if("1".equals(fileId)){
            fileName = "bioxreader.rar";
            }else if("2".equals(fileId)){
            fileName = "HSKBoss24H.rar";
            }else if("3".equals(fileId)){
            fileName = "passdog.rar";
            }else if("4".equals(fileId)){
            fileName = "wrar380sc.exe";
            }
            //判斷文件是否存在
            File file = new File(fullPath);
            if(file.exists()){
            try {
            //如果存在則下載
            response.reset(); //一定要,很重要
            response.setContentType("application/x-msdownload");
            response.setHeader("Content-Disposition", "attachment; filename=" + fileName);

            //將文件內容讀取到文件輸入流中
            FileInputStream fis = new FileInputStream(fullPath+"/"+fileName);
            BufferedInputStream bis = new BufferedInputStream(fis);
            //得到輸出流
            OutputStream os = response.getOutputStream();
            BufferedOutputStream bos = new BufferedOutputStream(os);
            //開始寫到客戶端
            int block = 2048;
            int actual = 0;
            byte[] bs = new byte[block];
            while((actual = bis.read(bs)) > 0){
            bos.write(bs, 0, actual);
            }

            bos.close();
            bis.close();
            os.close();
            fis.close();
            } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            }
            }
            return null;
            }  回復  更多評論   

           
          Copyright © 智者無疆 Powered by: 博客園 模板提供:滬江博客


             觀音菩薩贊

          主站蜘蛛池模板: 砚山县| 盘锦市| 柳河县| 桐庐县| 马龙县| 唐海县| 赤城县| 林西县| 武功县| 渝中区| 新野县| 铁岭市| 西宁市| 邹平县| 阜平县| 广安市| 乐亭县| 称多县| 万安县| 黔南| 铜陵市| 定边县| 阿拉善右旗| 扎兰屯市| 威海市| 米泉市| 邯郸市| 东丽区| 滨海县| 江西省| 井冈山市| 孝感市| 临桂县| 中江县| 咸宁市| 铁岭市| 静海县| 山丹县| 东山县| 昔阳县| 荣昌县|