擴(kuò)展CXF, 支持LoadBalance負(fù)載均衡

          轉(zhuǎn)載請(qǐng)注明作者和出處 http://scud.blogjava.net


          CXF是一個(gè)比較流行的Web Service框架. ( 當(dāng)然如果追求更高效, 還可以去搜索ice, thrift, protobuff之類的)

          近一個(gè)月, 斷斷續(xù)續(xù)地又好好看了看CXF的一些代碼, CXF的文檔還是很欠缺,特別是關(guān)于內(nèi)部實(shí)現(xiàn)的東西. 從我的感覺來說, 內(nèi)部實(shí)現(xiàn)還是挺復(fù)雜的. Inteceptor, Feature, ConduitSelector 這些概念一大堆, 又差不多可以做類似的事情, 真是讓人頭暈.


          CXF本身提供了一個(gè)FailoverFeature, 可以在調(diào)用服務(wù)出錯(cuò)時(shí)切換到其他服務(wù)器, 但是無法做到負(fù)載均衡, 我研究了幾天, 在FailoverFeature的基礎(chǔ)上改出來一個(gè)LoadBalanceFeature, 當(dāng)然也同時(shí)支持Failover.

          首先我們來看看如何使用CXF的FailoverFeature: (下載示例中包括使用xml和代碼兩種方式, 當(dāng)然CXF自己還提供了使用wsdl內(nèi)部定義的方式)

              我們需要先準(zhǔn)備一個(gè)HelloService, 非常簡(jiǎn)單的一個(gè)Web Service, 這里不在貼出, 具體可以看下載包
              調(diào)用代碼示例:

          package org.javascud.extensions.cxf.testfailover;

          import org.apache.cxf.clustering.FailoverFeature;
          import org.apache.cxf.clustering.RandomStrategy;
          import org.apache.cxf.feature.AbstractFeature;
          import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
          import org.javascud.extensions.cxf.service.Hello;

          import java.util.ArrayList;
          import java.util.List;

          public class HelloServiceFailOverClient
          {
              
          public static void main(String[] args)
              {
                  String helloFirst 
          = "http://localhost:8080/service/Hello";
                  String helloSecond 
          = "http://localhost:8081/service/Hello";
                  String helloThird 
          = "http://localhost:8082/service/Hello";
                  String helloFour 
          = "http://localhost:8083/service/Hello";

                  List
          <String> serviceList = new ArrayList<String>();
                  serviceList.add(helloFirst);
                  serviceList.add(helloSecond);
                  serviceList.add(helloThird);
                  
          //serviceList.add(helloFour);

                  RandomStrategy strategy 
          = new RandomStrategy();
                  strategy.setAlternateAddresses(serviceList);

                  FailoverFeature ff 
          = new FailoverFeature();
                  ff.setStrategy(strategy);

                  JaxWsProxyFactoryBean factory 
          = new JaxWsProxyFactoryBean();

                  List
          <AbstractFeature> features = new ArrayList<AbstractFeature>();
                  features.add(ff);

                  factory.setFeatures(features);
                  factory.initFeatures();

                  factory.setServiceClass(Hello.
          class);
                  
          //factory.setAddress("http://localhost:8080/service/Hello");

                  Hello client 
          = (Hello) factory.create();
                  String result 
          = client.sayHello("felix");
                  System.out.println(
          "result is: " + result);
              }
          }


          在遇到錯(cuò)誤時(shí)可以自動(dòng)使用下一個(gè)服務(wù)器, 但是必須要自己設(shè)置一個(gè)地址, 如果不設(shè)置的話也可以, 但是會(huì)出錯(cuò)然后failover.


          下面我們自己來看看我們的 LoadBalanceFeature

          1. 首先我們創(chuàng)建一個(gè)LoadBalanceFeature (完全和FailoverFeature一樣)

             Feature是用來定制Server, Client, Bus的一個(gè)組件, 具體可以查看AbstractFeature, 我們使用initialize方法來定制Client, 修改Client的Conduit選擇器達(dá)到負(fù)載均衡的目的.

             LoadBalanceFeature代碼如下:

          /**
           * This feature may be applied to a Client so as to enable
           * load balance , use any compatible endpoint for the target service.
           *
           * 
          @author Felix Zhang   Date:2010-10-3 22:58
           * 
          @see org.apache.cxf.clustering.FailoverFeature
           
          */
          public class LoadBalanceFeature extends AbstractFeature {
              
          private LoadBalanceStrategy loadBalanceStrategy;

              @Override
              
          public void initialize(Client client, Bus bus) {
                  LoadBalanceTargetSelector selector 
          = new LoadBalanceTargetSelector();
                  selector.setEndpoint(client.getEndpoint());
                  selector.setStrategy(getStrategy());
                  client.setConduitSelector(selector);
              }

              
          public void setStrategy(LoadBalanceStrategy strategy) {
                  loadBalanceStrategy 
          = strategy;
              }

              
          public LoadBalanceStrategy getStrategy() {
                  
          return loadBalanceStrategy;
              }

          }




          2. 定制一個(gè)LoadBalanceStrategy 負(fù)載均衡策略
          負(fù)載均衡策略有很多種, 例如隨機(jī)選擇, 順序選擇等, FailoverFeature提供了三種策略, 總之很簡(jiǎn)單, 我們?cè)谶@里就先實(shí)現(xiàn)隨機(jī)策略, 其他的策略都很簡(jiǎn)單, 幾行代碼就可以實(shí)現(xiàn)了.

              這個(gè)類主要用來設(shè)置/獲取所有的提供服務(wù)的地址列表, 為了方便控制, 我新增了2個(gè)選項(xiàng):
              A: alwaysChangeEndpoint 是否每次請(qǐng)求都切換地址: 如果只有一個(gè)客戶端, 可以分擔(dān)負(fù)載. 缺省為true
              B: removeFailedEndpoint 是否從全局的地址列表中移除失敗服務(wù)地址 -- 如果你沒有監(jiān)測(cè)服務(wù)器狀態(tài)的程序

             關(guān)于動(dòng)態(tài)增刪服務(wù)地址
          • 可以使用zookeeper等服務(wù)實(shí)時(shí)監(jiān)測(cè)服務(wù)器狀態(tài), 或者自己寫程序?qū)崿F(xiàn), 調(diào)用strategy.setAlternateAddresses即可.
          • removeFailedEndpoint 如果設(shè)置為true, 但沒有監(jiān)測(cè)服務(wù)器狀態(tài)的程序, 新增的或者復(fù)活的服務(wù)器則無法被恢復(fù)到地址列表中.
          • 考慮到效率和支持failover, 設(shè)置地址列表, 移除地址等沒有同步鎖.
          • 自動(dòng)移除失敗服務(wù)地址時(shí), 目前僅支持手動(dòng)地址列表, 沒有考慮wsdl中的多服務(wù)地址.
          • 后續(xù)我會(huì)寫一個(gè)使用zookeeper增刪服務(wù)地址列表的示例. (最近也在看zookeeper)


             主要的代碼都在AbstractLoadBalanceStrategy 中, 基本和 AbstractStaticFailoverStrategy 一樣, 添加了一個(gè)removeAlternateAddress 用于移除失敗的服務(wù)地址.

              LoadBalanceStrategy 接口的代碼如下:

          /**
           * Supports pluggable strategies for alternate endpoint selection on
           * load balance.
           * <p/>
           * Random, Retries, Mod (later)
           * <p/>
           * 1. support load balance  2.support fail over.
           *
           * 
          @author Felix Zhang   Date:2010-10-1 18:14
           * 
          @see org.apache.cxf.clustering.FailoverStrategy
           
          */
          public interface LoadBalanceStrategy {

              
          /**
               * Get the alternate endpoints for this invocation.
               *
               * 
          @param exchange the current Exchange
               * 
          @return a failover endpoint if one is available
               
          */
              List
          <Endpoint> getAlternateEndpoints(Exchange exchange);

              
          /**
               * Select one of the alternate endpoints for a retried invocation.
               *
               * 
          @param alternates List of alternate endpoints if available
               * 
          @return the selected endpoint
               
          */
              Endpoint selectAlternateEndpoint(List
          <Endpoint> alternates);

              
          /**
               * Get the alternate addresses for this invocation.
               * These addresses over-ride any addresses specified in the WSDL.
               *
               * 
          @param exchange the current Exchange
               * 
          @return a failover endpoint if one is available
               
          */
              List
          <String> getAlternateAddresses(Exchange exchange);

              
          /**
               * Select one of the alternate addresses for a retried invocation.
               *
               * 
          @param addresses List of alternate addresses if available
               * 
          @return the selected address
               
          */
              String selectAlternateAddress(List
          <String> addresses);

              
          /**
               * should remove failed endpoint or not.
               * only work for user defined addresses list.
               * 
          @return true or false
               
          */
              
          boolean isRemoveFailedEndpoint();

              
          /**
               * change endpoint every time or not.
               * 
          @return boolean
               
          */
              
          boolean isAlwaysChangeEndpoint();

              
          /**
               * remove failed address from list.
               * 
          @param address the failed address
               
          */
              
          void removeAlternateAddress(String address);
          }



              RandomLoadBalanceStrategy繼承自 AbstractLoadBalanceStrategy, 和 RandomStrategy的區(qū)別就是獲取下一個(gè)服務(wù)地址時(shí)并不從列表中移除此地址, 否則就做不到負(fù)載均衡了.


          3. 最重要的 LoadBalanceTargetSelector
              A: 這個(gè)類比較復(fù)雜, 我們?yōu)榱藢?shí)現(xiàn)負(fù)載均衡, 修改了prepare來動(dòng)態(tài)設(shè)置調(diào)用的endpoint, 替換策略取決于LoadBalanceStrategy
              主要代碼如下:

                      boolean existsEndpoint = false;
                      
          //check current endpoint is not null
                      Endpoint theEndpoint = exchange.get(Endpoint.class);
                      
          if (theEndpoint.getEndpointInfo().getAddress() != null) {
                          existsEndpoint 
          = true;
                      }

                      Endpoint nextEndpoint;
                      
          if (getStrategy().isAlwaysChangeEndpoint() || !existsEndpoint) {
                          
          //get a endpoint and set to current endpoint
                          Endpoint loadBalanceTarget = getLoadBalanceTarget(exchange);
                          
          if (loadBalanceTarget != null) {
                              logger.info(
          "switch to next target: " + loadBalanceTarget.getEndpointInfo().getAddress());
                              setEndpoint(loadBalanceTarget);

                              
          //update exchange.org.apache.cxf.message.Message.ENDPOINT_ADDRESS --- 不設(shè)置這個(gè)就用上次的奇怪
                              message.put(Message.ENDPOINT_ADDRESS, loadBalanceTarget.getEndpointInfo().getAddress());
                          }

                          nextEndpoint 
          = loadBalanceTarget;
                      } 
          else {
                          
          //use current endpoint
                          nextEndpoint = theEndpoint;
                      }




             
              B:為了和原有Failover特性兼容, 我們修改了 getFailoverTarget函數(shù), 在此函數(shù)中要移除失敗的服務(wù)地址, 因?yàn)樵谥拔覀冃薷牧薒oadBalanceStrategy, 它在獲取地址時(shí)不再移除當(dāng)前地址, 所以我們需要手動(dòng)移除.

              部分代碼如下:   
                      String currentAddress = getEndpoint().getEndpointInfo().getAddress();

                      
          //failover should remove current endpoint first, then get next -- 根據(jù)定義的策略來決定是否從全局地址列表中移除
                      if (getStrategy().isRemoveFailedEndpoint()) {
                          logger.warn(
          "remove current failed address: " + currentAddress);
                          
          //remove for client, not for current invocation -- 沒有同步鎖
                          getStrategy().removeAlternateAddress(currentAddress);
                      }

                      
          //remove for current invocation: 當(dāng)前請(qǐng)求中總是移除失敗服務(wù)地址
                      alternateAddresses.remove(currentAddress);

                      String alternateAddress 
          =
                              getStrategy().selectAlternateAddress(alternateAddresses);





          4. 調(diào)用實(shí)例:

             此處我們采用XML定義方式:
          <?xml version="1.0" encoding="UTF-8"?>
          <beans xmlns="http://www.springframework.org/schema/beans"
                 xmlns:xsi
          ="http://www.w3.org/2001/XMLSchema-instance"
                 xmlns:jaxws
          ="http://cxf.apache.org/jaxws"
                 xmlns:clustering
          ="http://cxf.apache.org/clustering"
                 xmlns:util
          ="http://www.springframework.org/schema/util"
                 xsi:schemaLocation
          ="
          http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
          http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"
          >

              
          <util:list id="addressList">
                  
          <value>http://localhost:8081/service/Hello</value>
                  
          <value>http://localhost:8082/service/Hello</value>
                  
          <value>http://localhost:8083/service/Hello</value>
                  
          <value>http://localhost:8086/service/Hello</value>
                  
          <value>http://localhost:8087/service/Hello</value>
                  
          <value>http://localhost:8088/service/Hello</value>
              
          </util:list>

              
          <bean id="SequentialAddresses" class="org.apache.cxf.clustering.SequentialStrategy">
                  
          <property name="alternateAddresses">
                      
          <ref bean="addressList"/>
                  
          </property>
              
          </bean>

              
          <bean id="randomAddresses" class="org.javascud.extensions.cxf.RandomLoadBalanceStrategy">
                  
          <property name="alternateAddresses">
                      
          <ref bean="addressList"/>
                  
          </property>
                  
          <property name="removeFailedEndpoint" value="true" />
              
          </bean>

              
          <bean id="loadBalanceFeature" class="org.javascud.extensions.cxf.LoadBalanceFeature">
                  
          <property name="strategy" ref="randomAddresses" />
              
          </bean>


              
          <jaxws:client name="helloClient"
                            serviceClass
          ="org.javascud.extensions.cxf.service.Hello"            >
                  
          <jaxws:features>
                      
          <ref bean="loadBalanceFeature" />
                  
          </jaxws:features>
              
          </jaxws:client>


          </beans>
           
          8081, 8082, 8083是實(shí)際存在的服務(wù), 其他的不存在.


          調(diào)用的Java代碼:

          package org.javascud.extensions.cxf.loadbalance;

          import org.apache.cxf.endpoint.Client;
          import org.apache.cxf.frontend.ClientProxy;
          import org.javascud.extensions.cxf.LoadBalanceStrategy;
          import org.javascud.extensions.cxf.service.Hello;
          import org.springframework.context.support.ClassPathXmlApplicationContext;


          public class HelloLoadBalanceAndFailOverClientByXML
          {
              
          public static void main(String[] args)
              {
                  ClassPathXmlApplicationContext context
                          
          = new ClassPathXmlApplicationContext(new String[]
                          {
          "org/javascud/extensions/cxf/loadbalance/loadbalance_fail.xml"});
                  Hello client 
          = (Hello) context.getBean("helloClient");

                  LoadBalanceStrategy strategy 
          = (LoadBalanceStrategy) context.getBean("randomAddresses");

                  Client myclient 
          = ClientProxy.getClient(client);
                  String address 
          = myclient.getEndpoint().getEndpointInfo().getAddress();

                  System.out.println(address);

                  
          for(int i=1; i<=20; i++)
                  {
                      String result1 
          = client.sayHello("Felix" +i);
                      System.out.println(
          "Call " + i +"" + result1);

                      
          int left = strategy.getAlternateAddresses(null).size();
                      System.out.println(
          "================== left " + left + " ===========================");
                  }


              }
          }

              此處僅僅為模擬測(cè)試.


          5. 關(guān)于測(cè)試用例
              沒想好如何寫單元測(cè)試, test里面目前都是隨意測(cè)試的代碼, 基本照顧到所有功能.

              

          6. 下載
          代碼下載: http://cnscud.googlecode.com/files/extensions-cxf_20101015.zip
          源碼位置: http://cnscud.googlecode.com/svn/trunk/extensions/  其中cxf目錄是此文章相關(guān)的源碼.

          7. 有任何問題請(qǐng)留言.


          轉(zhuǎn)載請(qǐng)注明作者和出處 http://scud.blogjava.net



          posted on 2010-10-15 13:01 Scud(飛云小俠) 閱讀(3048) 評(píng)論(0)  編輯  收藏 所屬分類: SOA

          <2010年10月>
          262728293012
          3456789
          10111213141516
          17181920212223
          24252627282930
          31123456

          導(dǎo)航

          統(tǒng)計(jì)

          公告

          文章發(fā)布許可
          創(chuàng)造共用協(xié)議:署名,非商業(yè),保持一致

          我的郵件
          cnscud # gmail


          常用鏈接

          留言簿(15)

          隨筆分類(113)

          隨筆檔案(103)

          相冊(cè)

          友情鏈接

          技術(shù)網(wǎng)站

          搜索

          積分與排名

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          主站蜘蛛池模板: 信宜市| 龙山县| 天峻县| 石楼县| 上杭县| 和田市| 衡阳市| 中西区| 双柏县| 乡宁县| 张家港市| 拉孜县| 同仁县| 临漳县| 大港区| 建昌县| 长海县| 萍乡市| 苏尼特左旗| 竹溪县| 淮滨县| 建昌县| 彭阳县| 淅川县| 柳河县| 古浪县| 始兴县| 泉州市| 屯留县| 盐津县| 东平县| 阿图什市| 泰安市| 夹江县| 怀安县| 寿光市| 孝感市| 华宁县| 临夏县| 城市| 东乌珠穆沁旗|