走自己的路

          路漫漫其修遠兮,吾將上下而求索

            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            50 隨筆 :: 4 文章 :: 118 評論 :: 0 Trackbacks
           

          Toplink分布式 Session Cache同步的方法oracle官方默認提供了JMSRMI兩種實現方式,當然用戶也可以自定方法,自定義一個Transport Manager Class, 具體可參見:



           

          http://download-west.oracle.com/docs/cd/B25221_04/web.1013/b13593/cachun003.htm

          Oracle Coherence是用來實現Data Gridframework。到目前為止,它還不能和Toplinknative版本整合,但是已經可以和Toplink Essential版本整合。Toplink升級版EclipseLink也承諾將會提供一個公共的接口讓用戶添加第三方的Cache ClusterEclipseLink中來作為L2 Cache,但目前尚未實現。把coherence作為toplink session cachetransport manager是因為認為coherenceTCMP集群協議要比普通的jmsrmi的通信協議快。具體還有待進一步測試比較、分析。在目前我們無法使用coherence data grid作為toplinkL2 Cache情況下,又無法忍受使用JMSRMI帶來的性能上的問題,使用一個自定義的Transport Manager是一個很好的嘗試。

          Coherence介紹:

          http://wiki.tangosol.com/display/COH/Oracle+Coherence+Knowledge+Base+Home

          我們用Coherence實現Toplink Coordinated Cache,以下是這個demo的具體實現過程,它也僅僅是個demo。對于如何自定義toplinkcache coordinator還是很有幫助的。

          1     Demo是用maven2+archifactory構建的,也使用了ant來作為部署時替換文本內容的工具。

          2     Coherence配置

          Coherence需要兩個配置文件: tangosol-coherence.xmlcoherence-cache-config,并且用戶可以自己提供tangosol-coherence-override-dev.xml來重寫tangosol-coherence.xml的部分內容,根據不同的環境定義不同的tangosol-coherence.xml文件。可以參見tangosol的網站來進行配置。

          coherence-cache-config,的配置:

           

          <?xml version="1.0"?>

          <!DOCTYPE cache-config SYSTEM "cache-config.dtd">

          <cache-config>

              
          <caching-scheme-mapping>

                 
          <cache-mapping>

                     
          <cache-name>toplinkSyn</cache-name>

                     
          <scheme-name>DistributedCacheScheme</scheme-name>

                 
          </cache-mapping>

              
          </caching-scheme-mapping>

              
          <caching-schemes>

                 
          <distributed-scheme>

                     
          <scheme-name>DistributedCacheScheme</scheme-name>

                     
          <service-name>DistributedCache</service-name>

                     
          <backing-map-scheme>

                        
          <local-scheme>

                            
          <scheme-ref>DistributedMap</scheme-ref>

                        
          </local-scheme>

                     
          </backing-map-scheme>

                     
          <backup-count>0</backup-count>

                     
          <autostart>true</autostart>

                 
          </distributed-scheme>

                 
          <local-scheme>

                     
          <scheme-name>DistributedMap</scheme-name>

                     
          <eviction-policy>LRU</eviction-policy>

                     
          <high-units>10000</high-units>

                     
          <expiry-delay>1D</expiry-delay>

                     
          <flush-delay>1D</flush-delay>

                     
          <cachestore-scheme></cachestore-scheme>

                 
          </local-scheme>

              
          </caching-schemes>

          </cache-config>

           

          tangosol-coherence-override-dev.xml的配置

          <?xml version='1.0'?>

          <!--

              This operational configuration override file is set up for use with Coherence in

              a development mode.

          -->

          <coherence xml-override="/tangosol-coherence-override.xml">

              
          <cluster-config>

                 
          <member-identity>

                     
          <cluster-name

                        
          system-property="tangosol.coherence.cluster">

                        Toplink Cache Synchronization

                     
          </cluster-name>

                     
          <member-name system-property="tangosol.coherence.member">

                        @{coherence.member.name}

                     
          </member-name>

                     
          <role-name>cache servers</role-name>

                 
          </member-identity>

                 
          <unicast-listener>

                     
          <well-known-addresses>

                        
          <socket-address id="1">

                            
          <address system-property="tangosol.coherence.wka">

                               146.222.51.20

                            
          </address>

                            
          <port

                               
          system-property="tangosol.coherence.wka.port">

                               8088

                            
          </port>

                        
          </socket-address>

                        
          <socket-address id="2">

                            
          <address system-property="tangosol.coherence.wka">

                               146.222.51.20

                            
          </address>

                            
          <port

                               
          system-property="tangosol.coherence.wka.port">

                               8089

                            
          </port>

                        
          </socket-address>

                     
          </well-known-addresses>

                     
          <address system-property="tangosol.coherence.localhost">

                        @{coherence.local.address}

                     
          </address>

                     
          <port system-property="tangosol.coherence.localport">

                        @{coherence.local.port}

                     
          </port>

                 
          </unicast-listener>

                 
          <authorized-hosts>

                     
          <host-address></host-address>

                     
          <host-range>

                        
          <from-address>146.222.51.0</from-address>

                        
          <to-address>146.222.51.255</to-address>

                     
          </host-range>

                 
          </authorized-hosts>

                 
          <packet-publisher>

                     
          <packet-delivery>

                        
          <timeout-milliseconds>30000</timeout-milliseconds>

                     
          </packet-delivery>

                 
          </packet-publisher>

              
          </cluster-config>

              
          <logging-config>

                 
          <destination>stderr</destination>

                 
          <severity-level

                     
          system-property="tangosol.coherence.log.level">

                     5

                 
          </severity-level>

                 
          <character-limit

                     
          system-property="tangosol.coherence.log.limit">

                     0

                 
          </character-limit>

              
          </logging-config>

          </coherence>



          demo會使用ant替換@{}中的內容。替換的key value值對需要在build.properties文件中給出。如:

          coherence.member.name = tts-server2

          coherence.local.address = 146.222.51.20

          coherence.local.port = 8089

          3           toplink配置

          Demo使用了toplink tutorial的范例作為一個example來演示結果的正確性。下載地址:

          http://www.oracle.com/technology/products/ias/toplink/doc/1013/main/_html/prt_tut.htm

          4           demo的配置文件tts.properties

          #one of jms, rmi, coherence or set it blank

          toplink.cache.type = coherence

          #the name of toplink command channel

          toplin.command.channel = OOCLToplinkCoherence

          第一個參數用來根據不同的情況使用不同toplink session的配置文件。第二個參數是toplink Command Channel的名字,唯一標識一個toplink cluster

           

          3          主要代碼

          1)      CoherenceTransportManager繼承TransportManager實現自定義的Transport Manager Class


          package com.oocl.isdc.sha.frm.tts.remotecommand;

          import oracle.toplink.internal.remotecommand.RemoteConnection;
          import oracle.toplink.remotecommand.DiscoveryManager;
          import oracle.toplink.remotecommand.RemoteCommandManager;
          import oracle.toplink.remotecommand.ServiceId;
          import oracle.toplink.remotecommand.TransportManager;

          import com.oocl.isdc.sha.frm.tts.cohererence.cache.CoherenceCache;

          public class CoherenceTransportManager extends TransportManager {
              
          protected CoherenceCache cache;
              
              
          public CoherenceTransportManager(RemoteCommandManager rcm) {
                  
          this.rcm = rcm;
                  
          this.initialize();
              }

              
              
          public void initialize() {
                  
          super.initialize();
                  
          this.cache = new CoherenceCache();
              }

              
              
          /**
               * When get a session, toplink will call this method to listen to
               * remote connection, and when some object is changed, it will get
               * the chages.
               
          */

              
          public void connectBackToRemote(RemoteConnection connection) {
                  CoherenceRemoteConnection coherenceConnection 
          = (CoherenceRemoteConnection)connection;
                  coherenceConnection.becomeMapListener();
              }


              
          public void createLocalConnection() {
                  CoherenceRemoteConnection connection 
          = new CoherenceRemoteConnection(rcm, cache);
                  addConnectionToExternalService(connection);
              }


              
          public RemoteConnection createConnection(ServiceId serviceId) {
                  
          return null;
              }


            
              
          public void removeLocalConnection() {
                  
          this.localConnection = null;
              }

              
              
              
          public DiscoveryManager createDiscoveryManager() {
                  
          return new CoherenceDiscoveryManager(rcm);
              }


              
          public CoherenceCache getCache() {
                  
          return cache;
              }

             
              
          public String getServiceUrl() {
                  
          return cache.getUrl();
              }

          }


           

          2)        CoherenceRemoteConnection繼承RemoteConnection從一個和Transport Mnager相關的連接。


           1package com.oocl.isdc.sha.frm.tts.remotecommand;
           2
           3import oracle.toplink.exceptions.CommunicationException;
           4import oracle.toplink.internal.remotecommand.RemoteConnection;
           5import oracle.toplink.remotecommand.Command;
           6import oracle.toplink.remotecommand.RemoteCommandManager;
           7
           8import com.oocl.isdc.sha.frm.tts.cohererence.cache.CoherenceCache;
           9import com.tangosol.util.MapEvent;
          10import com.tangosol.util.MapListener;
          11
          12public class CoherenceRemoteConnection extends RemoteConnection implements MapListener {
          13
          14    /** Comment for <code>serialVersionUID</code> */
          15    private static final long serialVersionUID = 8527315103990557963L;
          16
          17    private CoherenceCache cache;
          18
          19    private RemoteCommandManager rcm;
          20
          21    public CoherenceRemoteConnection(RemoteCommandManager rcm, CoherenceCache cache) {
          22        this.serviceId = rcm.getServiceId();
          23        this.rcm = rcm;
          24        this.cache = cache;
          25    }

          26
          27    /**
          28     * When some object in toplink session cache is chaged, it will callback this
          29     * method to put the changed object infomation to coherence cache
          30     */

          31    public Object executeCommand(Command command) throws CommunicationException {
          32        cache.putCache(command.getServiceId(), command);
          33        return null;
          34    }

          35
          36    @SuppressWarnings("unchecked")
          37    protected void processObject(Object object) {
          38        Command command = null;
          39        if (object instanceof Command) {
          40            command = (Command) object;
          41            if (command.getServiceId().getChannel().equals(serviceId.getChannel())) {
          42                rcm.processCommandFromRemoteConnection(command);
          43            }

          44        }
           else if (null == object) {
          45
          46        }
           else {
          47
          48        }

          49
          50    }

          51
          52    public void close() {
          53        this.cache.removeMapListener(this);
          54    }

          55    
          56    public void becomeMapListener() {
          57        this.cache.addMapListener(this);
          58    }

          59
          60    public void entryDeleted(MapEvent arg0) {
          61    }

          62
          63    /**
          64     * When an object is inserted into coherence, this method of
          65     * listener will be called
          66     */

          67    public void entryInserted(MapEvent event) {
          68        processObject(event.getNewValue());
          69    }

          70
          71    /**
          72     * When an object in coherence cache is updated, this method
          73     * of listener will be called
          74     */

          75    public void entryUpdated(MapEvent event) {
          76        processObject(event.getNewValue());
          77    }

          78
          79}

          80


           

          3)        CoherenceSessionHelpertransport manager添加到當前session中支持serverdatabase兩種session

           1package com.oocl.isdc.sha.frm.tts.remotecommand;
           2
           3import oracle.toplink.remotecommand.CommandProcessor;
           4import oracle.toplink.remotecommand.RemoteCommandManager;
           5import oracle.toplink.sessions.DatabaseSession;
           6import oracle.toplink.sessions.Session;
           7import oracle.toplink.threetier.Server;
           8
           9import com.oocl.isdc.sha.frm.tts.config.TTSConfigConstants;
          10import com.oocl.isdc.sha.frm.tts.config.TTSConfigParser;
          11
          12public class CoherenceSessionHelper {
          13    public static Session addCoherenceTransportManagerToSession(Session session) {
          14        RemoteCommandManager commandMgr = new RemoteCommandManager((CommandProcessor) session);
          15        commandMgr.setChannel((String) TTSConfigParser.getInstance().get(
          16                TTSConfigConstants.TOPLINK_COMMAND_CHANNEL_KEY));
          17        CoherenceTransportManager tm = new CoherenceTransportManager(commandMgr);
          18        tm.setInitialContextFactoryName(TTSConfigConstants.TTS_CONTEXT_FACTOYR_NAME);
          19        commandMgr.setUrl(tm.getServiceUrl());
          20        commandMgr.setTransportManager(tm);
          21        tm.setShouldRemoveConnectionOnError(true);
          22        if (session instanceof Server) {
          23            ((Server) session).setCommandManager(commandMgr);
          24            ((Server) session).setShouldPropagateChanges(true);
          25            ((Server) session).getCommandManager().initialize();
          26        }
           else if (session instanceof DatabaseSession) {
          27            ((DatabaseSession) session).setCommandManager(commandMgr);
          28            ((DatabaseSession) session).setShouldPropagateChanges(true);
          29            ((DatabaseSession) session).getCommandManager().initialize();
          30        }
           else {
          31            throw new IllegalArgumentException("Session must be a server or database session");
          32        }

          33        return session;
          34    }

          35}

          36


           

          4)        CoherenceCache:自定義一個Coherence NamedCache

          package com.oocl.isdc.sha.frm.tts.cohererence.cache;

          import java.net.InetAddress;
          import java.util.Collection;

          import com.oocl.isdc.sha.frm.tts.config.TTSConfigConstants;
          import com.tangosol.net.CacheFactory;
          import com.tangosol.net.Cluster;
          import com.tangosol.net.Member;
          import com.tangosol.net.NamedCache;
          import com.tangosol.net.Service;
          import com.tangosol.util.MapListener;

          public class CoherenceCache {
              
          private NamedCache namedCache;

              
          public CoherenceCache() {
                  
          this.namedCache = CacheFactory.getCache("toplinkSyn");
              }


              
          public void putCache(Object key, Object value) {
                  namedCache.put(key, value);
              }


              
          public Object retrieveCache(Object key) {
                  
          return namedCache.get(key);
              }


              
          public NamedCache getNamedCache() {
                  
          return namedCache;
              }

              
              
          public void addMapListener(MapListener listener) {
                  
          this.namedCache.addMapListener(listener);
              }

              
              
          public void removeMapListener(MapListener listener) {
                  
          this.namedCache.removeMapListener(listener);
              }

              
              @SuppressWarnings(
          "unchecked")
              
          public Collection retrieveCacheAll() {
                 
          return  this.namedCache.values();
              }

              
              
          public String getUrl() {
                  Member member 
          = getLocalMember();
                  InetAddress address 
          = member.getAddress();
                  String ipAddress 
          = address.getHostAddress();
                  
          int port = member.getPort();
                  StringBuffer sb 
          = new StringBuffer(TTSConfigConstants.TOPLINK_SERVICEID_PREFIX)
                      .append(ipAddress).append(
          ":").append(port);
                  
          return sb.toString();
              }

              
              
          public Member getLocalMember() {
                  Service service 
          = namedCache.getCacheService();
                  Cluster cluster 
          = service.getCluster();    
                  Member member 
          = cluster.getLocalMember();
                  
          return member;
              }

          }


           

          5)        CoherenceDiscoveryManager繼承DiscoveryManager,主要重寫了startDiscoverystopDiscovery方法

           1package com.oocl.isdc.sha.frm.tts.remotecommand;
           2
           3import oracle.toplink.exceptions.ValidationException;
           4import oracle.toplink.remotecommand.DiscoveryManager;
           5import oracle.toplink.remotecommand.RemoteCommandManager;
           6
           7public class CoherenceDiscoveryManager extends DiscoveryManager {
           8    
           9    public CoherenceDiscoveryManager(RemoteCommandManager rcm) {
          10        super(rcm);
          11    }

          12
          13    public RemoteCommandManager getRemoteCommandManager() {
          14        return this.rcm;
          15    }

          16    
          17    public boolean isDiscoveryStopped() {
          18        throw ValidationException.operationNotSupported("isDiscoveryStopped");
          19    }

          20
          21    public void startDiscovery() {
          22        ((CoherenceTransportManager)rcm.getTransportManager()).createLocalConnection();
          23    }

          24    
          25    /**
          26     * We must implement this method, and we keep it blank
          27     */

          28    public void stopDiscovery() {
          29      
          30    }

          31
          32    public void setAnnouncementDelay(int millisecondsToDelay) {
          33        throw ValidationException.operationNotSupported("setAnnouncementDelay");
          34    }

          35
          36    public int getAnnouncementDelay() {
          37        throw ValidationException.operationNotSupported("getAnnouncementDelay");
          38    }

          39
          40    public String getMulticastGroupAddress() {
          41        throw ValidationException.operationNotSupported("getMulticastGroupAddress");
          42    }

          43
          44    public void setMulticastGroupAddress(String address) {
          45        throw ValidationException.operationNotSupported("setMulticastGroupAddress");
          46    }

          47
          48 
          49    public void setMulticastPort(int port) {
          50        throw ValidationException.operationNotSupported("setMulticastPort");
          51    }

          52
          53    public int getMulticastPort() {
          54        throw ValidationException.operationNotSupported("getMulticastPort");
          55    }

          56}

          57
           

          3           命令行下mvn clean install部署Demo,部署的oc4j instance信息需要在ear下的pom.xml中給出。

              <properties>

                 <home.j2ee>D:/oc4j_extended_101310/j2ee/home</home.j2ee>

                 <oc4j.host>localhost</oc4j.host>

                 <rmi.port>23791</rmi.port>

                 <deploy.username>oc4jadmin</deploy.username>

                 <deploy.password>welcome</deploy.password>

              </properties>

          因為我們需要檢測cache同步還需要部署到另外一個oc4j instance上,需要修改build.properties為:

          coherence.member.name = tts-server1

          coherence.local.address = 146.222.51.20

          coherence.local.port = 8088

          ear下的pom.xml為:

          <properties>

                 <home.j2ee>C:/oc4j_extended_101310/j2ee/home</home.j2ee>

                 <oc4j.host>localhost</oc4j.host>

                 <rmi.port>23792</rmi.port>

                 <deploy.username>oc4jadmin</deploy.username>

                 <deploy.password>welcome</deploy.password>

              </properties>

          4           通過瀏覽器訪問demo,并檢測cache實現,我們在一個oc4j instance上修改employee的數據在另外一個oc4jinstance中就會立即呈現這個改變。并可以看到merge employee的相關toplinklog。如果沒有cache同步(democachecache 同步策略都基本采用了toplink的默認配置),因為employee上有樂觀鎖,當在另外一個oc4j instance或者說服務器上修改employee的數據,就會出現樂觀鎖異常,會rollback此次修改,如果有cache同步就會拿到最新的數據而不需要直接訪問db,最新的數據和db一致,大大減少了樂觀鎖出現的頻率。當然為了輔助更好的使用cache同步我們還需要定義cache invalidation機制。當然還有很多其他的cache策略避免出現臟數據。

          Toplink Log

          [TopLink 非常詳細]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Received remote command oracle.toplink.remotecommand.MergeChangeSetCommand from Service[OOCL Toplink Coherence, 26464827, tcmp://146.222.51.20:8089]

          [TopLink 非常詳細]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Executing command oracle.toplink.remotecommand.MergeChangeSetCommand from Service[OOCL Toplink Coherence, 26464827, tcmp://146.222.51.20:8089]

          [TopLink 較詳細]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Received updates from Remote Server

          [TopLink 非常詳細]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Merging com.oocl.isdc.sha.frm.tts.model.Employee: [558] from remote server

          頁面演示:





           

          上面兩個網頁截圖上的url表明是兩個不同的oc4j server。并且他們用了同一個db也是同一個application。在在server1上修改數據:







          posted on 2008-05-24 17:37 叱咤紅人 閱讀(2295) 評論(0)  編輯  收藏 所屬分類: Java Persistence, Transaction and ORM Oracle
          主站蜘蛛池模板: 修水县| 龙泉市| 鄂托克前旗| 洱源县| 胶州市| 大同市| 二连浩特市| 外汇| 涞源县| 金山区| 卫辉市| 龙江县| 嘉善县| 廉江市| 安丘市| 黑龙江省| 康保县| 封开县| 吕梁市| 昆明市| 武夷山市| 离岛区| 东阳市| 阳江市| 荃湾区| 东乌珠穆沁旗| 乌什县| 芮城县| 阳西县| 祁连县| 抚顺市| 铁力市| 滦平县| 安达市| 繁昌县| 娄烦县| 鲜城| 且末县| 重庆市| 丹江口市| 宣恩县|