擴展CXF, 支持LoadBalance負載均衡

          轉載請注明作者和出處 http://scud.blogjava.net


          CXF是一個比較流行的Web Service框架. ( 當然如果追求更高效, 還可以去搜索ice, thrift, protobuff之類的)

          近一個月, 斷斷續(xù)續(xù)地又好好看了看CXF的一些代碼, CXF的文檔還是很欠缺,特別是關于內部實現(xiàn)的東西. 從我的感覺來說, 內部實現(xiàn)還是挺復雜的. Inteceptor, Feature, ConduitSelector 這些概念一大堆, 又差不多可以做類似的事情, 真是讓人頭暈.


          CXF本身提供了一個FailoverFeature, 可以在調用服務出錯時切換到其他服務器, 但是無法做到負載均衡, 我研究了幾天, 在FailoverFeature的基礎上改出來一個LoadBalanceFeature, 當然也同時支持Failover.

          首先我們來看看如何使用CXF的FailoverFeature: (下載示例中包括使用xml和代碼兩種方式, 當然CXF自己還提供了使用wsdl內部定義的方式)

              我們需要先準備一個HelloService, 非常簡單的一個Web Service, 這里不在貼出, 具體可以看下載包
              調用代碼示例:

          package org.javascud.extensions.cxf.testfailover;

          import org.apache.cxf.clustering.FailoverFeature;
          import org.apache.cxf.clustering.RandomStrategy;
          import org.apache.cxf.feature.AbstractFeature;
          import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
          import org.javascud.extensions.cxf.service.Hello;

          import java.util.ArrayList;
          import java.util.List;

          public class HelloServiceFailOverClient
          {
              
          public static void main(String[] args)
              {
                  String helloFirst 
          = "http://localhost:8080/service/Hello";
                  String helloSecond 
          = "http://localhost:8081/service/Hello";
                  String helloThird 
          = "http://localhost:8082/service/Hello";
                  String helloFour 
          = "http://localhost:8083/service/Hello";

                  List
          <String> serviceList = new ArrayList<String>();
                  serviceList.add(helloFirst);
                  serviceList.add(helloSecond);
                  serviceList.add(helloThird);
                  
          //serviceList.add(helloFour);

                  RandomStrategy strategy 
          = new RandomStrategy();
                  strategy.setAlternateAddresses(serviceList);

                  FailoverFeature ff 
          = new FailoverFeature();
                  ff.setStrategy(strategy);

                  JaxWsProxyFactoryBean factory 
          = new JaxWsProxyFactoryBean();

                  List
          <AbstractFeature> features = new ArrayList<AbstractFeature>();
                  features.add(ff);

                  factory.setFeatures(features);
                  factory.initFeatures();

                  factory.setServiceClass(Hello.
          class);
                  
          //factory.setAddress("http://localhost:8080/service/Hello");

                  Hello client 
          = (Hello) factory.create();
                  String result 
          = client.sayHello("felix");
                  System.out.println(
          "result is: " + result);
              }
          }


          在遇到錯誤時可以自動使用下一個服務器, 但是必須要自己設置一個地址, 如果不設置的話也可以, 但是會出錯然后failover.


          下面我們自己來看看我們的 LoadBalanceFeature

          1. 首先我們創(chuàng)建一個LoadBalanceFeature (完全和FailoverFeature一樣)

             Feature是用來定制Server, Client, Bus的一個組件, 具體可以查看AbstractFeature, 我們使用initialize方法來定制Client, 修改Client的Conduit選擇器達到負載均衡的目的.

             LoadBalanceFeature代碼如下:

          /**
           * This feature may be applied to a Client so as to enable
           * load balance , use any compatible endpoint for the target service.
           *
           * 
          @author Felix Zhang   Date:2010-10-3 22:58
           * 
          @see org.apache.cxf.clustering.FailoverFeature
           
          */
          public class LoadBalanceFeature extends AbstractFeature {
              
          private LoadBalanceStrategy loadBalanceStrategy;

              @Override
              
          public void initialize(Client client, Bus bus) {
                  LoadBalanceTargetSelector selector 
          = new LoadBalanceTargetSelector();
                  selector.setEndpoint(client.getEndpoint());
                  selector.setStrategy(getStrategy());
                  client.setConduitSelector(selector);
              }

              
          public void setStrategy(LoadBalanceStrategy strategy) {
                  loadBalanceStrategy 
          = strategy;
              }

              
          public LoadBalanceStrategy getStrategy() {
                  
          return loadBalanceStrategy;
              }

          }




          2. 定制一個LoadBalanceStrategy 負載均衡策略
          負載均衡策略有很多種, 例如隨機選擇, 順序選擇等, FailoverFeature提供了三種策略, 總之很簡單, 我們在這里就先實現(xiàn)隨機策略, 其他的策略都很簡單, 幾行代碼就可以實現(xiàn)了.

              這個類主要用來設置/獲取所有的提供服務的地址列表, 為了方便控制, 我新增了2個選項:
              A: alwaysChangeEndpoint 是否每次請求都切換地址: 如果只有一個客戶端, 可以分擔負載. 缺省為true
              B: removeFailedEndpoint 是否從全局的地址列表中移除失敗服務地址 -- 如果你沒有監(jiān)測服務器狀態(tài)的程序

             關于動態(tài)增刪服務地址
          • 可以使用zookeeper等服務實時監(jiān)測服務器狀態(tài), 或者自己寫程序實現(xiàn), 調用strategy.setAlternateAddresses即可.
          • removeFailedEndpoint 如果設置為true, 但沒有監(jiān)測服務器狀態(tài)的程序, 新增的或者復活的服務器則無法被恢復到地址列表中.
          • 考慮到效率和支持failover, 設置地址列表, 移除地址等沒有同步鎖.
          • 自動移除失敗服務地址時, 目前僅支持手動地址列表, 沒有考慮wsdl中的多服務地址.
          • 后續(xù)我會寫一個使用zookeeper增刪服務地址列表的示例. (最近也在看zookeeper)


             主要的代碼都在AbstractLoadBalanceStrategy 中, 基本和 AbstractStaticFailoverStrategy 一樣, 添加了一個removeAlternateAddress 用于移除失敗的服務地址.

              LoadBalanceStrategy 接口的代碼如下:

          /**
           * Supports pluggable strategies for alternate endpoint selection on
           * load balance.
           * <p/>
           * Random, Retries, Mod (later)
           * <p/>
           * 1. support load balance  2.support fail over.
           *
           * 
          @author Felix Zhang   Date:2010-10-1 18:14
           * 
          @see org.apache.cxf.clustering.FailoverStrategy
           
          */
          public interface LoadBalanceStrategy {

              
          /**
               * Get the alternate endpoints for this invocation.
               *
               * 
          @param exchange the current Exchange
               * 
          @return a failover endpoint if one is available
               
          */
              List
          <Endpoint> getAlternateEndpoints(Exchange exchange);

              
          /**
               * Select one of the alternate endpoints for a retried invocation.
               *
               * 
          @param alternates List of alternate endpoints if available
               * 
          @return the selected endpoint
               
          */
              Endpoint selectAlternateEndpoint(List
          <Endpoint> alternates);

              
          /**
               * Get the alternate addresses for this invocation.
               * These addresses over-ride any addresses specified in the WSDL.
               *
               * 
          @param exchange the current Exchange
               * 
          @return a failover endpoint if one is available
               
          */
              List
          <String> getAlternateAddresses(Exchange exchange);

              
          /**
               * Select one of the alternate addresses for a retried invocation.
               *
               * 
          @param addresses List of alternate addresses if available
               * 
          @return the selected address
               
          */
              String selectAlternateAddress(List
          <String> addresses);

              
          /**
               * should remove failed endpoint or not.
               * only work for user defined addresses list.
               * 
          @return true or false
               
          */
              
          boolean isRemoveFailedEndpoint();

              
          /**
               * change endpoint every time or not.
               * 
          @return boolean
               
          */
              
          boolean isAlwaysChangeEndpoint();

              
          /**
               * remove failed address from list.
               * 
          @param address the failed address
               
          */
              
          void removeAlternateAddress(String address);
          }



              RandomLoadBalanceStrategy繼承自 AbstractLoadBalanceStrategy, 和 RandomStrategy的區(qū)別就是獲取下一個服務地址時并不從列表中移除此地址, 否則就做不到負載均衡了.


          3. 最重要的 LoadBalanceTargetSelector
              A: 這個類比較復雜, 我們?yōu)榱藢崿F(xiàn)負載均衡, 修改了prepare來動態(tài)設置調用的endpoint, 替換策略取決于LoadBalanceStrategy
              主要代碼如下:

                      boolean existsEndpoint = false;
                      
          //check current endpoint is not null
                      Endpoint theEndpoint = exchange.get(Endpoint.class);
                      
          if (theEndpoint.getEndpointInfo().getAddress() != null) {
                          existsEndpoint 
          = true;
                      }

                      Endpoint nextEndpoint;
                      
          if (getStrategy().isAlwaysChangeEndpoint() || !existsEndpoint) {
                          
          //get a endpoint and set to current endpoint
                          Endpoint loadBalanceTarget = getLoadBalanceTarget(exchange);
                          
          if (loadBalanceTarget != null) {
                              logger.info(
          "switch to next target: " + loadBalanceTarget.getEndpointInfo().getAddress());
                              setEndpoint(loadBalanceTarget);

                              
          //update exchange.org.apache.cxf.message.Message.ENDPOINT_ADDRESS --- 不設置這個就用上次的奇怪
                              message.put(Message.ENDPOINT_ADDRESS, loadBalanceTarget.getEndpointInfo().getAddress());
                          }

                          nextEndpoint 
          = loadBalanceTarget;
                      } 
          else {
                          
          //use current endpoint
                          nextEndpoint = theEndpoint;
                      }




             
              B:為了和原有Failover特性兼容, 我們修改了 getFailoverTarget函數(shù), 在此函數(shù)中要移除失敗的服務地址, 因為在之前我們修改了LoadBalanceStrategy, 它在獲取地址時不再移除當前地址, 所以我們需要手動移除.

              部分代碼如下:   
                      String currentAddress = getEndpoint().getEndpointInfo().getAddress();

                      
          //failover should remove current endpoint first, then get next -- 根據(jù)定義的策略來決定是否從全局地址列表中移除
                      if (getStrategy().isRemoveFailedEndpoint()) {
                          logger.warn(
          "remove current failed address: " + currentAddress);
                          
          //remove for client, not for current invocation -- 沒有同步鎖
                          getStrategy().removeAlternateAddress(currentAddress);
                      }

                      
          //remove for current invocation: 當前請求中總是移除失敗服務地址
                      alternateAddresses.remove(currentAddress);

                      String alternateAddress 
          =
                              getStrategy().selectAlternateAddress(alternateAddresses);





          4. 調用實例:

             此處我們采用XML定義方式:
          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
                 xmlns:xsi
          ="http://www.w3.org/2001/XMLSchema-instance"
                 xmlns:jaxws
          ="http://cxf.apache.org/jaxws"
                 xmlns:clustering
          ="http://cxf.apache.org/clustering"
                 xmlns:util
          ="http://www.springframework.org/schema/util"
                 xsi:schemaLocation
          ="
          http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
          http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"
          >

              
          <util:list id="addressList">
                  
          <value>http://localhost:8081/service/Hello</value>
                  
          <value>http://localhost:8082/service/Hello</value>
                  
          <value>http://localhost:8083/service/Hello</value>
                  
          <value>http://localhost:8086/service/Hello</value>
                  
          <value>http://localhost:8087/service/Hello</value>
                  
          <value>http://localhost:8088/service/Hello</value>
              
          </util:list>

              
          <bean id="SequentialAddresses" class="org.apache.cxf.clustering.SequentialStrategy">
                  
          <property name="alternateAddresses">
                      
          <ref bean="addressList"/>
                  
          </property>
              
          </bean>

              
          <bean id="randomAddresses" class="org.javascud.extensions.cxf.RandomLoadBalanceStrategy">
                  
          <property name="alternateAddresses">
                      
          <ref bean="addressList"/>
                  
          </property>
                  
          <property name="removeFailedEndpoint" value="true" />
              
          </bean>

              
          <bean id="loadBalanceFeature" class="org.javascud.extensions.cxf.LoadBalanceFeature">
                  
          <property name="strategy" ref="randomAddresses" />
              
          </bean>


              
          <jaxws:client name="helloClient"
                            serviceClass
          ="org.javascud.extensions.cxf.service.Hello"            >
                  
          <jaxws:features>
                      
          <ref bean="loadBalanceFeature" />
                  
          </jaxws:features>
              
          </jaxws:client>


          </beans>
           
          8081, 8082, 8083是實際存在的服務, 其他的不存在.


          調用的Java代碼:

          package org.javascud.extensions.cxf.loadbalance;

          import org.apache.cxf.endpoint.Client;
          import org.apache.cxf.frontend.ClientProxy;
          import org.javascud.extensions.cxf.LoadBalanceStrategy;
          import org.javascud.extensions.cxf.service.Hello;
          import org.springframework.context.support.ClassPathXmlApplicationContext;


          public class HelloLoadBalanceAndFailOverClientByXML
          {
              
          public static void main(String[] args)
              {
                  ClassPathXmlApplicationContext context
                          
          = new ClassPathXmlApplicationContext(new String[]
                          {
          "org/javascud/extensions/cxf/loadbalance/loadbalance_fail.xml"});
                  Hello client 
          = (Hello) context.getBean("helloClient");

                  LoadBalanceStrategy strategy 
          = (LoadBalanceStrategy) context.getBean("randomAddresses");

                  Client myclient 
          = ClientProxy.getClient(client);
                  String address 
          = myclient.getEndpoint().getEndpointInfo().getAddress();

                  System.out.println(address);

                  
          for(int i=1; i<=20; i++)
                  {
                      String result1 
          = client.sayHello("Felix" +i);
                      System.out.println(
          "Call " + i +"" + result1);

                      
          int left = strategy.getAlternateAddresses(null).size();
                      System.out.println(
          "================== left " + left + " ===========================");
                  }


              }
          }

              此處僅僅為模擬測試.


          5. 關于測試用例
              沒想好如何寫單元測試, test里面目前都是隨意測試的代碼, 基本照顧到所有功能.

              

          6. 下載
          代碼下載: http://cnscud.googlecode.com/files/extensions-cxf_20101015.zip
          源碼位置: http://cnscud.googlecode.com/svn/trunk/extensions/  其中cxf目錄是此文章相關的源碼.

          7. 有任何問題請留言.


          轉載請注明作者和出處 http://scud.blogjava.net



          posted on 2010-10-15 13:01 Scud(飛云小俠) 閱讀(3042) 評論(0)  編輯  收藏 所屬分類: SOA

          <2010年10月>
          262728293012
          3456789
          10111213141516
          17181920212223
          24252627282930
          31123456

          導航

          統(tǒng)計

          公告

          文章發(fā)布許可
          創(chuàng)造共用協(xié)議:署名,非商業(yè),保持一致

          我的郵件
          cnscud # gmail


          常用鏈接

          留言簿(15)

          隨筆分類(113)

          隨筆檔案(103)

          相冊

          友情鏈接

          技術網(wǎng)站

          搜索

          積分與排名

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 浪卡子县| 鹤峰县| 渝中区| 什邡市| 榆树市| 大余县| 宜良县| 旅游| 神池县| 乡宁县| 响水县| 连江县| 康保县| 育儿| 孝昌县| 成安县| 新龙县| 东平县| 新巴尔虎左旗| 霍州市| 仙游县| 南郑县| 教育| 泗洪县| 沙雅县| 读书| 外汇| 抚松县| 翼城县| 蚌埠市| 东阿县| 岑溪市| 四会市| 长宁县| 宝丰县| 周口市| 龙江县| 清徐县| 应用必备| 宁强县| 上栗县|