使用ZooKeeper為CXF或其他服務動態更新服務器信息




          ZooKeeper是一個優秀的協調服務, 目前是Hadoop的一個子項目. 我們可以用它來為我們的服務提供配置中心, 注冊中心, 分布式同步鎖, 消息隊列等服務, 更多信息請瀏覽 http://hadoop.apache.org/zookeeper/

          上篇文章中實現一個CXF的負載均衡服務, 本次我們使用ZooKeeper來為我們的服務提供動態服務器列表, 以便把客戶端的調用分配到各個有效的服務上去.


          動態更新服務列表有2種方法
            * 定時去獲取數據, 更新我們的數據 --- 通用
            * 使用ZooKeeper的watch特性, 有服務器加入/退出時我們自動獲取通知 --- 適用于有消息通知機制的


             首先我們的HelloService部分要向ZooKeeper注冊
              只有注冊到ZooKeeper上, 我們才知道你可以提供這個服務. 在實際環境中, 需要每個服務都需要向ZooKeeper注冊 ()
             
              注冊代碼如下:

              private void register2Zookeeper(String address) throws Exception
              {
                  ZooKeeper zk 
          = new ZooKeeper(zkAddress, 3000null);

                  GroupMemberCenter gmc 
          = new GroupMemberCenter();
                  gmc.setZooKeeper(zk);

                  gmc.createAndSetGroup(groupName);
                  gmc.joinGroupByDefine(address);

                  System.out.println(
          "register service to zookeeper: " + address);
              }





              GroupMemberCenter是一個輔助類, 代碼如下:


              /**
               * Dynamic member center.
               * <p/>
               * The member maybe leave or dead dynamiclly.
               *
               *
               * 
          @author: Felix Zhang  Date: 2010-9-30 17:58:16
               
          */
              
          public class GroupMemberCenter
              {
                  
          public static final String ESCAPE_PREFIX = "|||";

                  
          private static final Log log = LogFactory.getLog(GroupMemberCenter.class);
                  
          private static final List<String> EMPTY_MEMBERS = new ArrayList<String>(0);

                  
          private ZooKeeper zk;
                  
          private String group = "";

                  
          private String me = "";

                  
          public void setZooKeeper(ZooKeeper zk)
                  {
                      
          this.zk = zk;
                  }

                  
          public void setGroup(String groupName)
                  {
                      
          if (groupName != null && groupName.length() > 0)
                      {
                          
          if (!groupName.startsWith("/"))
                          {
                              groupName 
          = "/" + groupName;
                          }

                          
          this.group = groupName;
                      }
                  }

                  
          public boolean createAndSetGroup(String groupName)
                  {
                      
          boolean result = createGroup(groupName);

                      
          if (result)
                      {
                          setGroup(groupName);
                      }

                      
          return result;
                  }

                  
          public boolean createGroup(String groupName)
                  {
                      
          assert groupName != null;

                      
          if (!groupName.startsWith("/"))
                      {
                          groupName 
          = "/" + groupName;
                      }

                      
          try
                      {
                          Stat s 
          = zk.exists(groupName, false);

                          
          if (s == null)
                          {
                              zk.create(groupName, 
          new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                          }
                      }
                      
          catch (Exception e)
                      {
                          log.error(
          "create group error: " + groupName, e);
                          
          return false;
                      }
                      
          return true;
                  }

                  
          protected String buildName(String name)
                  {
                      
          return group + "/" + name;
                  }

                  
          public boolean joinGroup()
                  {
                      
          return joinGroup(null);
                  }

                  
          public boolean joinGroup(Integer port)
                  {
                      
          try
                      {
                          
          //use ipaddress as default, if you will use different ipaddress, you need joinGroup(yourip)
                          me = InetAddress.getLocalHost().getHostAddress();
                          
          return joinGroupByDefine(me + ":" + port);
                      }
                      
          catch (Exception e)
                      {
                          log.error(
          "join group error", e);
                          
          return false;
                      }
                  }

                  
          public boolean joinGroupByDefine(String userdefine)
                  {
                      
          assert userdefine != null;
                      
          assert userdefine.length() > 0;

                      
          try
                      {
                          me 
          = userdefine;
                          
          if (me.contains("[host]"))
                          {
                              String host 
          = InetAddress.getLocalHost().getHostAddress();
                              me 
          = me.replaceFirst("\\[host\\]", host);
                          }

                          
          //if contains "/", how to deal?      --- maybe we need more format in future
                          me = ESCAPE_PREFIX + URLEncoder.encode(me, "UTF-8");

                          zk.create(buildName(me), 
          new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                      }
                      
          catch (Exception e)
                      {
                          log.error(
          "join group error: " + me, e);
                          
          return false;
                      }

                      
          return true;
                  }

                  
          public void leaveGroup()
                  {
                      
          try
                      {
                          zk.delete(buildName(me), 
          0);
                      }
                      
          catch (Exception e)
                      {
                          log.error(
          "leave group error: " + me, e);
                      }
                  }

                  
          public List<String> fetchGroupMembers()
                  {
                      
          return fetchGroupMembers(group, null);
                  }

                  
          public List<String> fetchGroupMembers(String groupName)
                  {
                      
          return fetchGroupMembers(groupName, null);
                  }

                  
          public List<String> fetchGroupMembers(String groupName, Watcher watcher)
                  {
                      
          if (groupName != null && groupName.length() > 0)
                      {
                          
          if (!groupName.startsWith("/"))
                          {
                              groupName 
          = "/" + groupName;
                          }
                      }
                      
          else
                      {
                          
          return EMPTY_MEMBERS;
                      }

                      
          try
                      {
                          List
          <String> childlist;
                          
          if(watcher == null)
                          {
                              childlist 
          = zk.getChildren(groupName, false);
                          }
                          
          else
                          {
                              childlist 
          = zk.getChildren(groupName, watcher);
                          }

                          List
          <String> lastresult = new ArrayList<String>();
                          
          for (String item : childlist)
                          {
                              
          if (item.startsWith(ESCAPE_PREFIX))
                              {
                                  lastresult.add(URLDecoder.decode(item, 
          "UTF-8").substring(3));
                              }
                              
          else
                              {
                                  lastresult.add(item);
                              }
                          }

                          
          return lastresult;
                      }
                      
          catch (Exception e)
                      {
                          log.error(
          "fetch group members error", e);
                          
          return EMPTY_MEMBERS;
                      }
                  }
              }




              GroupMemberCenter主要是把用戶的address信息做一下轉義然后在ZooKeeper中創建一個節點, 注冊時使用 CreateMode.EPHEMERAL 模式, 也就是類似心跳監測, 如果服務掛掉, 那么ZooKeeper會自動刪除此節點.


              為了方便測試, 編寫3個啟動服務的程序來模擬多臺機器, 啟動的都是Hello服務, 只是端口不一樣而已:

              public class HelloServiceServer5Zookeeper1 {
                  
          public static void main(String[] args) throws Exception {
                      
          new HelloServicePublisher5Zookeeper().start("http://localhost:8081/service/Hello"new HelloFirstImpl());
                  }
              }



              其他2個請自己看源碼包.

             
              下面我們來準備Client, 代碼和上篇文章中的一樣, 首先是一個XML:


          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
                 xmlns:xsi
          ="http://www.w3.org/2001/XMLSchema-instance"
                 xmlns:jaxws
          ="http://cxf.apache.org/jaxws"
                 xmlns:clustering
          ="http://cxf.apache.org/clustering"
                 xmlns:util
          ="http://www.springframework.org/schema/util"
                 xsi:schemaLocation
          ="
          http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
          http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"
          >


              
          <bean id="loadBalanceStrategy" class="org.javascud.extensions.cxf.RandomLoadBalanceStrategy">
                  
          <property name="removeFailedEndpoint" value="true" />
              
          </bean>

              
          <bean id="loadBalanceFeature" class="org.javascud.extensions.cxf.LoadBalanceFeature">
                  
          <property name="strategy" ref="loadBalanceStrategy" />
              
          </bean>


              
          <jaxws:client name="helloClient"
                            serviceClass
          ="org.javascud.extensions.cxf.service.Hello"            >
                  
          <jaxws:features>
                      
          <ref bean="loadBalanceFeature" />
                  
          </jaxws:features>
              
          </jaxws:client>

              
          <bean id="zooKeeper" class="org.apache.zookeeper.ZooKeeper">
                  
          <constructor-arg index="0" value="127.0.0.1:2181" />
                  
          <constructor-arg index="1" value="3000" />
                  
          <constructor-arg index="2" ><null/></constructor-arg>
              
          </bean>
          </beans>


              XML沒有寫任何服務的網址, 后面的程序負責更新服務列表. 此XML定義了一個ZooKeeper客戶端, 你可以根據自己的實際情況修改, 例如ZooKeeper本身也可以是負載均衡的 (一般為3臺服務器, 方便投票).

             
              調用的Java代碼如下:



                  ClassPathXmlApplicationContext context
                          
          = new ClassPathXmlApplicationContext(new String[]
                          {
          "org/javascud/extensions/cxf/zookeeper/client/loadbalance_fail_zookeeper.xml"});
                  
          final Hello client = (Hello) context.getBean("helloClient");

                  
          final AbstractLoadBalanceStrategy strategy = (AbstractLoadBalanceStrategy) context.getBean("loadBalanceStrategy");

                  Client myclient 
          = ClientProxy.getClient(client);
                  String address 
          = myclient.getEndpoint().getEndpointInfo().getAddress();
                  System.out.println(address);


                  ZooKeeper zk 
          = (ZooKeeper) context.getBean("zooKeeper");
             
              
          //使用定時刷新的方式更新服務列表: 實際代碼中可以寫一個單獨的類來調用
                  ServiceEndpointsFetcher fetcher = new ServiceEndpointsFetcher();
                  fetcher.setStrategy(strategy);
                  fetcher.setZooKeeper(zk);
                  fetcher.setGroupName(groupName);
                  fetcher.start();

              
          //調用服務
                  for (int i = 1; i <= 1000; i++) {
                      String result1 
          = client.sayHello("Felix" + i);
                      System.out.println(
          "Call " + i + "" + result1);

                      
          int left = strategy.getAlternateAddresses(null).size();
                      System.out.println(
          "================== left " + left + " ===========================");

                      Thread.sleep(
          100);
                  }


              查看上面的代碼可以發現, 我們使用了ServiceEndpointsFetcher來刷新, 間隔固定的時間去獲取最新的服務列表.


              我們還可以采用觀察者方式來更新服務列表:

          /**
           * watcher service from zookeeper.
           *
           * 
          @author Felix Zhang   Date:2010-10-16 01:13
           
          */
          public class ServiceEndpointsWatcher extends ZooKeeperChildrenWatcher {

              
          private AbstractLoadBalanceStrategy strategy;

              
          public void setStrategy(AbstractLoadBalanceStrategy strategy) {
                  
          this.strategy = strategy;
              }

              @Override
              
          protected void updateData(List<String> members) {
                  strategy.setAlternateAddresses(members);
              }
          }


              ZooKeeperChildrenWatcher是一個父類, 調用GroupMemberCenter的代碼來監測ZooKeeper上的對應節點:

          /**
           * a Watcher for monitor zookeeper by getChildren
           *
           * 
          @author Felix Zhang   Date:2010-10-16 14:39
           
          */
          public abstract class ZooKeeperChildrenWatcher implements Watcher {
              
          private ZooKeeper zooKeeper;
              
          private String groupName;
              
          private GroupMemberCenter gmc = null;

              
          public void setZooKeeper(ZooKeeper zooKeeper) {
                  
          this.zooKeeper = zooKeeper;
              }

              
          public void setGroupName(String groupName) {
                  
          this.groupName = groupName;
              }

              @Override
              
          public void process(WatchedEvent event) {
                  fetchAndUpdate();
              }

              
          private void fetchAndUpdate() {
                  
          //get children and register watcher again
                  List<String> members = gmc.fetchGroupMembers(groupName, this);

                  updateData(members);
              }

              
          protected abstract void updateData(List<String> members);

              
          public void init() {
                  
          if (zooKeeper != null) {
                      gmc 
          = new GroupMemberCenter();
                      gmc.setZooKeeper(zooKeeper);

                      fetchAndUpdate();
                  }
              }
          }
             

              調用ServiceEndpointsWatcher的代碼是在Spring的XML中, 當然也可以在單獨程序中調用:

             
              <bean id="serviceEndpointsWatcher"
                    class
          ="org.javascud.extensions.cxf.zookeeper.ServiceEndpointsWatcher"
                      init-method
          ="init">
                  
          <property name="strategy" ref="loadBalanceStrategy" />
                  
          <property name="zooKeeper" ref="zooKeeper" />
                  
          <property name="groupName" value="helloservice" />
              
          </bean>


             
              ok, 下面我們啟動ZooKeeper, 在2181端口. 然后其次啟動三個HelloService: HelloServiceServer5Zookeeper1, HelloServiceServer5Zookeeper2, HelloServiceServer5Zookeeper3, 它們分別監測在8081, 8082, 8083端口, 并且會向ZooKeeper注冊, 你查看用ZooKeeper的客戶端查看 ls /helloservice, 應該可以看到三個節點.

              然后我們運行客戶端代碼 HelloClient5Zookeeper, 在客戶端運行的過程中, 我們可以終止/啟動HelloService, 就可以看到我們的程序會動態地訪問不同的HelloService, 達到了負載均衡的目的.






              注: ServiceEndpointsWatcher 或ServiceEndpointsFetcher 一定現行運行, 否則調用服務的部分會拋出異常, 因為沒有可用的服務地址.


          代碼打包下載: http://cnscud.googlecode.com/files/extensions-20101016.zip
          SVN 源碼位置: http://cnscud.googlecode.com/svn/trunk/extensions/ 


          轉載請注明作者和出處 http://scud.blogjava.net
             


          posted on 2010-10-16 19:37 Scud(飛云小俠) 閱讀(4967) 評論(1)  編輯  收藏 所屬分類: SOA

          評論

          # re: 使用ZooKeeper為CXF或其他服務動態更新服務器信息 2012-09-07 15:32 zdonking

          吼吼,沒想到飛哥 業余還研究代碼  回復  更多評論   

          <2012年9月>
          2627282930311
          2345678
          9101112131415
          16171819202122
          23242526272829
          30123456

          導航

          統計

          公告

          文章發布許可
          創造共用協議:署名,非商業,保持一致

          我的郵件
          cnscud # gmail


          常用鏈接

          留言簿(15)

          隨筆分類(113)

          隨筆檔案(103)

          相冊

          友情鏈接

          技術網站

          搜索

          積分與排名

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 通江县| 西安市| 郯城县| 甘德县| 永新县| 东安县| 行唐县| 河南省| 永胜县| 鄯善县| 竹山县| 汤原县| 伊金霍洛旗| 博罗县| 云和县| 桦甸市| 林芝县| 龙川县| 巴林左旗| 江口县| 通城县| 宁津县| 元江| 大悟县| 左云县| 得荣县| 阳城县| 晋宁县| 广宗县| 望江县| 大冶市| 千阳县| 昌平区| 苏尼特左旗| 五华县| 鄂州市| 南雄市| 巴南区| 镇平县| 成武县| 林口县|