badqiu

          XPer
          隨筆 - 46, 文章 - 3, 評論 - 195, 引用 - 0
          數據加載中……

          分布式應用上下文(Distributed ThreadLocal)

          1.問題

          單機應用內,在進程內部,我們可以使用ThreadLocal傳遞應用上下文的方式. 當前的 Spring Secrucity , Spring TransactionManager, Log4J MDC, Struts2 ActionContext等等應用場景隨處可見.

          但在是分布式系統下,由于不是在同一個進程內,所以無法使用ThreadLocal. 那么什么是分布式ThreadLocal呢?就是將一個系統中的ThreadLocal信息可以傳遞至下一個系統,將兩者的調用可以關聯起來。如對應用有一個調用,我們生成一個請求ID (traceId),在后面所有分布式系統調用中,可以通過這個traceId將所有調用關聯起來,這樣查找調用日志都將十分方便.

          2.實現方式

          我們現在使用的通訊協議,一般都包含兩部分:Header,Body. 如 Soap Header,Http Header. 通過自定義Header,可以帶上我們的自定義信息。 然后在服務器端解析Header,再得到自定義信息。那么就可以完成Distributed ThreadLocal的功能。

          如上圖,通過兩個攔截器,client在調用之前,將DistrbiutedThreadLocal中的信息放在soap header中,在服務端方法調用之前,從soap header中取回 DistrbiutedThreadLocal信息。

          3. 實現代碼.

          以下為CXF webservice的實現代碼,一個DistributedThreadLocal及增加了兩個攔截器. hessian 也可以自定義Header,完成傳遞.

          DistributedThreadLocal

          /**
           * 分布式 ThreadLocal, 存放在ThreadLocal中的數據可以傳輸至另外一臺機器上
           * 
          @author badqiu
           
          */
          public class DistributedThreadLocal {
              
          public static String DISTRIBUTED_THREAD_LOCAL_KEY_PREFIX = "tl_";
              
              
          public static ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>();

              
          public static void putAll(Map<String, String> map) {
                  getMap().putAll(map);
              }
              
              
          public static void put(String key, String value) {
                  getMap().put(key, value);
              }

              
          public static String get(String key) {
                  Map
          <String, String> map = threadLocal.get();
                  
          if (map == null)
                      
          return null;
                  
          return (String) map.get(key);
              }

              
          public static Map<String, String> getMap() {
                  Map
          <String, String> map = threadLocal.get();
                  
          if (map == null) {
                      map 
          = new HashMap();
                      threadLocal.set(map);
                  }
                  
          return map;
              }

              
          public static void clear() {
                  threadLocal.set(
          null);
              }

          }

          DistributedThreadLocalInSOAPHeaderInterceptor

          /**
           * 輸入(In)攔截器,用于從 WebService SOAP 的Header中取回DistributedThreadLocal中的信息,并存放在DistributedThreadLocal中
           * 
           * 
          @author badqiu
           
          */
          public class DistributedThreadLocalInSOAPHeaderInterceptor extends AbstractSoapInterceptor {
              
              
          private SAAJInInterceptor saajIn = new SAAJInInterceptor();  
              
              
          public DistributedThreadLocalInSOAPHeaderInterceptor() {  
                  
          super(Phase.PRE_PROTOCOL);  
                  getAfter().add(SAAJInInterceptor.
          class.getName());  
              }  

              
          public void handleMessage(SoapMessage message) throws Fault {
                  SOAPMessage doc 
          = message.getContent(SOAPMessage.class);  
                  
          if (doc == null) {  
                      saajIn.handleMessage(message);  
                      doc 
          = message.getContent(SOAPMessage.class);  
                  }  
                  
                  Map
          <String,String> headers = toHeadersMap(doc);  
                  DistributedThreadLocal.putAll(headers);
                  
              }

              
          private Map toHeadersMap(SOAPMessage doc) {
                  SOAPHeader header 
          = getSOAPHeader(doc);  
                  
          if (header == null) {  
                      
          return new HashMap(0);  
                  } 
                  
                  Map
          <String,String> headersMap = new HashMap();
                  NodeList nodes 
          = header.getChildNodes();
                  
          for(int i=0; i<nodes.getLength(); i++) {  
                      Node item 
          = nodes.item(i);
                      
          if(item.hasChildNodes()) {
                          headersMap.put(item.getLocalName(), item.getFirstChild().getNodeValue());
                      }
                  }
                  
          return headersMap;
              }

              
          private SOAPHeader getSOAPHeader(SOAPMessage doc) {
                  SOAPHeader header;
                  
          try {
                      header 
          = doc.getSOAPHeader();
                  } 
          catch (SOAPException e) {
                      
          throw new RuntimeException(e);
                  }
                  
          return header;
              }

          }

          DistributedThreadLocalOutSOAPHeaderInterceptor

          /**
           * 輸出(Out)攔截器,用于將DistributedThreadLocal中的信息存放在 WebService SOAP 的Header中
           * 
           * 
          @author badqiu
           
          */
          public class DistributedThreadLocalOutSOAPHeaderInterceptor extends AbstractSoapInterceptor {
              
              
          public DistributedThreadLocalOutSOAPHeaderInterceptor() {
                  
          super(Phase.WRITE);
              }

              
          public void handleMessage(SoapMessage message) throws Fault {
                  
                  List
          <Header> headers = message.getHeaders();
                  Map
          <String,String> threadlocalMap = DistributedThreadLocal.getMap();
                  
                  
          for(Map.Entry<String, String> entry : threadlocalMap.entrySet()) {
                      headers.add(getHeader(entry.getKey(), entry.getValue()));
                  }
              }

              
          private Header getHeader(String key, String value) {
                  QName qName 
          = new QName(key);
                  Document document 
          = DOMUtils.createDocument();
                  Element element 
          = document.createElement(key);
                  element.appendChild(document.createTextNode(value));
                  SoapHeader header 
          = new SoapHeader(qName, element);
                  
          return (header);
              }
          }

          CXF spring配置文件:

          server端:

          <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns:jaxws
          ="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
              xsi:schemaLocation
          ="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
              
          default-lazy-init="true">

              
          <description>Apache CXF的Web Service配置</description>

              
          <import resource="classpath:META-INF/cxf/cxf.xml" />
              
          <import resource="classpath:META-INF/cxf/cxf-servlet.xml" />
              
          <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />

              
          <!-- jax-ws endpoint定義  -->
              
          <jaxws:endpoint address="/hello" >
                  
          <jaxws:implementor ref="hello" />
                  
          <jaxws:inInterceptors>
                      
          <bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdInSOAPHeaderInterceptor"/>
                  
          </jaxws:inInterceptors>
              
          </jaxws:endpoint>
              
              
          <!-- WebService的實現Bean定義 -->
              
          <bean id="hello" class="cn.org.rapid_framework.hessian.HessianTest.HelloImpl" />
          </beans>

          client端:

          <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns:jaxws
          ="http://cxf.apache.org/jaxws" xmlns:cxf="http://cxf.apache.org/core"
              xsi:schemaLocation
          ="http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
              
          default-lazy-init="true">
              
          <description>Apache CXF Web Service Client端配置</description>

              
          <jaxws:client id="hello" serviceClass="cn.org.rapid_framework.hessian.HessianTest.Hello"
                  address
          ="http://localhost:8080/service/hello" >
                  
          <jaxws:outInterceptors>
                      
          <bean class="cn.org.rapid_framework.distributed.threadlocal.cfx.TraceIdOutSOAPHeaderInterceptor"/>
                  
          </jaxws:outInterceptors>
              
          </jaxws:client>

          </beans>

          4. 應用場景.

          通過分布式應用上下文,暫時想到的幾個應用場景.

          1. Log4j MDC traceId傳遞. 通過一個traceId,將所有相關的 操作所有的日志信息關聯起來。

          2. sessionId 傳遞, 讓我們的應用也有狀態,可以使用session什么的

          3. Security(username,password)傳遞. 在需要安全調用的地方,避免污染接口,需要顯式的在接口傳遞username,password. 相對應的 WSSecurity也可以走這個通道

          分布式應用上下文的概念,全球首創,歡迎轉載(因為google 搜索不到相關文章,或許早已經有相同的概念了,歡迎提醒我)。

          posted on 2011-01-04 19:56 badqiu 閱讀(2301) 評論(3)  編輯  收藏

          評論

          # re: 分布式應用上下文(Distributed ThreadLocal)[未登錄]  回復  更多評論   

          就是報文傳輸過來,待了threadlocal的信息,然后攔截,再寫入當前threadlocal中,其實很多分布式的概率類似,就是信息共享,但是樓主就是將threadlocal信息共享?可以這樣說?

          恩,攔截器代入threadlocal,思路不錯,有點像servlet用filter處理threadloacal,這里利用了webservice實現了分布式,然后攔截器有點filter的意思。
          2011-01-05 11:24 | garfield

          # re: 分布式應用上下文(Distributed ThreadLocal)  回復  更多評論   

          @garfield

          我另外一個 hassian的實現就是如你所有通過一個 Filter來實現的。
          主要是對應用來說,可以透明的傳遞應用上下文。
          2011-01-05 11:37 | badqiu

          # re: 分布式應用上下文(Distributed ThreadLocal)[未登錄]  回復  更多評論   

          @badqiu
          恩,這樣主要是保證了開發兩端的可以一致的使用,保證來開發的一致性,省去了很多不必要的東西
          2011-01-05 15:35 | garfield

          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 乌拉特前旗| 文水县| 洛宁县| 赞皇县| 泰州市| 大余县| 望城县| 普格县| 桂林市| 新化县| 隆回县| 绍兴市| 绥德县| 明溪县| 上犹县| 定州市| 邵阳县| 彩票| 柘荣县| 旅游| 鸡西市| 龙川县| 贵州省| 卢湾区| 确山县| 丽江市| 永平县| 伊宁市| 柏乡县| 阳原县| 海兴县| 本溪市| 包头市| 奈曼旗| 清河县| 类乌齐县| 沅陵县| 益阳市| 黑河市| 门源| 隆昌县|