Knight of the round table

          wansong

          JGroups 簡介、適用場合、配置、程序例子Demo等完全使用指南

          目前項目中在克服JGroups初期使用的困難之后,已經使用比較穩定了。感覺比較煩瑣和容易出錯的還是JGroups配置。感覺中文的資源較少,現將自己總結的經驗總結如下
          Tim http://hi.baidu.com/jabber/blog/item/7e879852a23efd0f0cf3e3ea.html

          http://puras.iteye.com/blog/81783

          JGroups 適合使用場合
          服務器集群cluster、多服務器通訊、服務器replication(復制)等,分布式cache緩存

          JGroups 簡介
          JGroups是一個基于Java語言的提供可靠多播(組播)的開發工具包。在IP Multicast基礎上提供可靠服務,也可以構建在TCP或者WAN上。主要是由Bela Ban開發,屬于JBoss.org,在JBoss的網站也有一些相關文檔。目前在 SourceForge上還是比較活躍,經常保持更新。

          JGroups 配置

          PING: 發現初始成員
          MERGE2: 將網絡層切分的包重新合并。
          FD_SOCK: Failure Dectection 錯誤檢測,基于TCP
          FD:Failure Dectection 錯誤檢測,基于心跳
          VERIFY_SUSPECT: 檢查貌似失敗的節點
          pbcast.NAKACK: 應答,提供可靠傳輸
          UNICAST: 可靠的UNICAST
          pbcast.STABLE: 計算廣播信息是否穩定
          VIEW_SYNC: 定期廣播view(成員名單)
          pbcast.GMS: Group membership, 處理joins/leaves/crashes等
          FC: 流量控制
          FRAG2:Fragmentation layer,分包,將大的數據包分拆成適合網絡層傳輸

          以上一些是比較重要的配置,基本上不能少。如果要深入研究可以在 org.jgroups.protocols 里面查看源代碼

          JGroups使用例子, JGroups demo, Tim的hello world例子
          Timreceiver.java

          import org.jgroups.tests.perf.Receiver;
          import org.jgroups.tests.perf.Transport;
          import org.jgroups.util.Util;

          public class TimReceiver implements Receiver {
          private Transport transport = null;

          public static void main(String[] args) {
          TimReceiver t = new TimReceiver();
          try {
          int sendMsgCount = 5000;
          int msgSize = 1000;
          t.start();

          t.sendMessages(sendMsgCount, msgSize);
          System.out.println("########## Begin to recv...");
          Thread.currentThread().join();
          } catch (Exception e) {
          e.printStackTrace();
          } finally {
          if (t != null) {
          t.stop();
          }
          }
          }

          public void start()
          throws Exception {
          transport = (Transport) new TimTransport();
          transport.create(null);
          transport.setReceiver(this);
          transport.start();
          }

          public void stop() {
          if (transport != null) {
          transport.stop();
          transport.destroy();
          }
          }

          private int count = 0;
          public void receive(Object sender, byte[] data) {
          System.out.print(".");
          if (++count == 5000) {
          System.out.println("\r\nRECV DONE.");
          System.exit(0);
          }

          }

          private void sendMessages(int count, int msgSize)
          throws Exception {
          byte[] buf = new byte[msgSize];
          for (int k = 0; k < msgSize; k++)
          buf[k] = 'T';

          System.out.println("-- sending " + count + " " + Util.printBytes(msgSize) + " messages");

          for (int i = 0; i < count; i++) {
          transport.send(null, buf);
          }

          System.out.println("######### send complete");
          }
          }

          TimTransport.java


          import java.util.Map;
          import java.util.Properties;

          import org.jgroups.Address;
          import org.jgroups.JChannel;
          import org.jgroups.Message;
          import org.jgroups.ReceiverAdapter;
          import org.jgroups.tests.perf.Receiver;
          import org.jgroups.tests.perf.Transport;

          public class TimTransport extends ReceiverAdapter implements Transport{
          private JChannel channel = null;
          private String groupName = "TimDemo";
          private Receiver receiver = null;

          String PROTOCOL_STACK_UDP1 = "UDP(bind_addr=192.168.100.59";
          String PROTOCOL_STACK_UDP2 = ";mcast_port=8888";
          String PROTOCOL_STACK_UDP3 = ";mcast_addr=225.1.1.1";
          String PROTOCOL_STACK_UDP4 = ";tos=8;loopback=false;max_bundle_size=64000;" +
          "use_incoming_packet_handler=true;use_outgoing_packet_handler=false;ip_ttl=2;enable_bundling=true):"
          + "PING:MERGE2:FD_SOCK:FD:VERIFY_SUSPECT:"
          +"pbcast.NAKACK(gc_lag=50;max_xmit_size=50000;use_mcast_xmit=false;" +
          "retransmit_timeout=300,600,1200,2400,4800;discard_delivered_msgs=true):"
          +"UNICAST:pbcast.STABLE:VIEW_SYNC:"
          +"pbcast.GMS(print_local_addr=false;join_timeout=3000;" +
          "join_retry_timeout=2000;" +
          "shun=true;view_bundling=true):"
          +"FC(max_credits=2000000;min_threshold=0.10):FRAG2(frag_size=50000)";


          public Object getLocalAddress() {
          return channel != null ? channel.getLocalAddress() : null;
          }

          public void start() throws Exception {
          channel.connect(groupName);
          }

          public void stop() {
          if (channel != null) {
          channel.shutdown();
          }
          }

          public void destroy() {
          if (channel != null) {
          channel.close();
          channel = null;
          }
          }

          public void setReceiver(Receiver r) {
          this.receiver = r;
          }

          public Map dumpStats() {
          return channel != null ? channel.dumpStats() : null;
          }

          public void send(Object destination, byte[] payload) throws Exception {
          byte[] tmp = new byte[payload.length];
          System.arraycopy(payload, 0, tmp, 0, payload.length);
          Message msg = null;
          msg = new Message((Address) destination, null, tmp);
          if (channel != null) {
          channel.send(msg);
          }
          }

          public void receive(Message msg) {
          Address sender = msg.getSrc();
          byte[] payload = msg.getBuffer();
          if (receiver != null) {
          try {
          receiver.receive(sender, payload);
          } catch (Throwable tt) {
          tt.printStackTrace();
          }
          }
          }

          public void create(Properties config) throws Exception {
          String PROTOCOL_STACK = PROTOCOL_STACK_UDP1 + PROTOCOL_STACK_UDP2 + PROTOCOL_STACK_UDP3 + PROTOCOL_STACK_UDP4;
          channel = new JChannel(PROTOCOL_STACK);
          channel.setReceiver(this);
          }

          public void send(Object destination, byte[] payload, boolean oob) throws Exception {
          send(destination, payload);
          }
          }

          posted on 2011-08-06 20:19 w@ns0ng 閱讀(636) 評論(0)  編輯  收藏 所屬分類: GridGain

          主站蜘蛛池模板: 南皮县| 丰台区| 东丰县| 图们市| 莱西市| 崇仁县| 富川| 霍州市| 丽水市| 莱阳市| 中方县| 博白县| 南岸区| 定陶县| 明水县| 花莲市| 尉犁县| 那坡县| 泸州市| 华安县| 金平| 什邡市| 凌海市| 沧源| 堆龙德庆县| 太仆寺旗| 武威市| 泉州市| 南江县| 建平县| 沅陵县| 姚安县| 黄骅市| 龙井市| 山东省| 寿光市| 垫江县| 江油市| 安岳县| 屏山县| 泾阳县|