posts - 42,comments - 83,trackbacks - 0
                  今天有人提出了一個詭異的要求,要求在全局事務中執行多線程操作。他們全局事務中涉及兩個數據庫中的多個表,如果單線程那么走完,相應時間上不滿足要求,說白了就是比較慢,于是提出了這樣的要求。從JTA的規范來看,transaction(TX)和thread是密切相關的,TX一般是不能在應用線程間傳遞的, 即我主線程起一個全局事務,然后我把這個事務傳遞給其他我新起的線程,單純的變量傳遞沒問題,但這個事務是不能被transaction manager(TM)識別的,TM對TX的管理有他自己的方式。從weblogic的實現來看,TX被放在當前線程的threadlocal中,普通應用線程不存在這樣的結構,所以簡單的變量傳遞,對于TM而言是沒有意義的。那么到底有沒有方法實現上面的需求的,我做了些測試,使用weblogic內部的一些API可以實現這個需求。下面我們就來看看實現中的幾個要點: :)

                  1:上面說了,簡單的變量傳遞對于weblogic的TM是沒有意義的。TM判斷事務上下文(transaction context)的時候,會從當前線程的threadlocal檢查,如果沒有,則說明當前線程沒有和任何TX關聯。那么我們如何將我們手里的TX放入當前線程的threadlocal呢? weblogic的ExecuteThread是我們需要的那種線程,但它是final的,我們不能繼承它,只能繼承它的父類了,也就是weblogic.kernel.AuditableThread。

                  2:我們有繼承了AuditableThread,那么我們怎么把TX放入它的threadlocal中呢?這個可以通過weblogic的TM實現中的一些API來實現,具體到這個類就是weblogic.transaction.internal.TransactionManagerImpl。比如interResume(tx),internalSuspend()。由于這個API不是package protect的,我們自己的類必須也位于weblogic.transaction.internal這個包中。interResume(tx),用于將當前線程和指定的TX做關聯,而internalSuspend()恰恰相反,它用于解除這種關聯。

                  3:因為涉及到多線程,主線程需要決定何時提交或回滾事務,這個我們要自己要實現一個線程結果檢查的方法(checkCompletion())。

                   下面就是我自己實現的測試代碼,在Weblogic81測試沒有問題。
            1 package weblogic.transaction.internal;
            2 
            3 import weblogic.transaction.TxHelper;
            4 import weblogic.transaction.internal.TransactionManagerImpl;
            5 import javax.transaction.Transaction;
            6 import java.util.ArrayList;
            7 
            8 public class DriverTest {
            9 
           10     private static String INITIAL_CONTEXT_FACTORY = "weblogic.jndi.WLInitialContextFactory"
           11     private static String PROVIDER_URL = "t3://localhost:8001";  
           12     private static String SQL_INSERT = "insert into test values(?)";
           13     private static String ANO_SQL_INSERT = "insert into test1 values(?)";
           14     
           15     public static void main(String args[])
           16     {
           17         DriverTest test = new DriverTest();
           18         test.multiThreadXATest();
           19     }
           20     
           21     private Connection getConnection(String url, String dsName) throws NamingException, SQLException
           22     {
           23         InitialContext ctx = initializeEnv(url);
           24         DataSource ds = (DataSource)ctx.lookup(dsName);
           25         ctx.close();
           26         return ds.getConnection();
           27     }
           28     
           29     private UserTransaction getUserTransaction() throws NamingException, SQLException
           30     {
           31         InitialContext ctx = initializeEnv(null);
           32         return (UserTransaction)ctx.lookup("javax/transaction/UserTransaction");
           33     }
           34     
           35     private InitialContext initializeEnv(String url) throws NamingException
           36     {
           37         Properties prop = new Properties();
           38         if(url == null)
           39             prop.put(Context.PROVIDER_URL, PROVIDER_URL);
           40         else
           41             prop.put(Context.PROVIDER_URL, url);
           42         prop.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
           43         return new InitialContext(prop);
           44     }
           45     
           46     private void executeInsertInPSMT(Connection conn, String sql)
           47     {
           48         PreparedStatement pstmt = null;
           49         try{
           50             pstmt = conn.prepareStatement(sql);
           51             pstmt.setString(1"data_to_insert");
           52             pstmt.executeUpdate();
           53             pstmt.close();
           54         }catch(SQLException e){
           55                 e.printStackTrace();
           56         }    
           57     }
           58     
           59     public void multiThreadXATest()
           60     {
           61         ArrayList result = new ArrayList();
           62         try{
           63             UserTransaction userTx = getUserTransaction();
           64             userTx.setTransactionTimeout(1000);
           65             userTx.begin();
           66             Transaction tx = TxHelper.getTransaction();
           67             Connection conn = getConnection("t3://localhost:8011""TestXADS");
           68             if(conn != null) conn.close();
                            SQLThread thread1 
          = new SQLThread(tx,result,"t3://localhost:8011","TestXADS", SQL_INSERT);
           69             SQLThread thread2 = new SQLThread(tx,result,"t3://localhost:8021","TestXADS_1", ANO_SQL_INSERT);
           70             thread1.start();
           71             thread2.start();
           72             while(result.size() != 2){
           73                 Thread.currentThread().sleep(1);
           74             }
           75             if(checkCompletion(result)){
           76                 userTx.commit();
           77             }
           78             else{
           79                 userTx.rollback();
           80             }
           81         }catch(Exception e){
           82             e.printStackTrace();
           83         }
           84     }
           85     
           86     private boolean checkCompletion(ArrayList result){
           87         boolean toReturn = true;
           88         for(int loop=0; loop<result.size(); loop++){
           89             if((!((String)result.get(loop)).equals("OK"))){
           90                 toReturn = false;
           91                 break;
           92             }
           93         }
           94         return toReturn;
           95     }
           96     
           97     class SQLThread extends weblogic.kernel.AuditableThread {
           98         
           99         private Transaction tx = null;
          100         private ArrayList result = null;
          101         private String dsName = null;
          102         private String url = null;
          103         private String sql = null;
          104         
          105         public SQLThread(Transaction tx,ArrayList result,String ds, String url, String sql){
          106             this.tx = tx;
          107             this.result = result;
          108             this.dsName = ds;
          109             this.url = url;
          110             this.sql = sql;
          111         }
          112         
          113         public void run(){
          114             Connection conn = null;
          115             try{
          116                 TransactionManagerImpl tm = (TransactionManagerImpl)TransactionManagerImpl.getTransactionManager();
          117                 tm.internalResume((TransactionImpl)tx);
          118                 DriverTest test = new DriverTest();
          119                 conn = test.getConnection(url, dsName);
          120                 test.executeInsertInPSMT(conn, sql);
          121                 conn.close();
          122                 tm.internalSuspend();
          123                 result.add("OK");
          124             }catch(Exception e){
          125                 result.add("NA");
          126                 e.printStackTrace();
          127             }finally{
          128                 try{
          129                     if(conn != null)
          130                         conn.close();
          131                 }catch(Exception e){
          132                     e.printStackTrace();
          133                 }
          134             }
          135         }
          136     }
          137 }
          138 
          139 

                  下面是關于上面這段測試代碼的一些解釋和代碼中的限制:
                   1:為什么會在66行出現Connection conn = getConnection("t3://localhost:8011""TestXADS");這個看似無用的語句?Weblogic的TM實現中只有有XAResource參與到這個global transaction的server實例才有資格充當這個global transaction的coordinator,其他的server實例只能充當sub-coordinator。而且總是第一個參與全局事務的XAResource的實例充當coordinator,因為coordinator的委任決定于TX開始后,第一次RMI request發送給哪個server。Connection conn = getConnection("t3://localhost:8001""TestXADS")用于指定這個global transaction的coordinator為8011這個server。如果沒有這個語句,thread1,thread2啟動后,它們開始XA操作時,每個XAResouce都會把自己當作這個TX的coordinator(Thread1委任8011,Thread2委任8021),這樣就會出現如下的異常,
                  
          javax.transaction.TransactionRolledbackException: Current server is the coordinator and transaction is not found.  It was probably rolled back and forgotten already.
           at weblogic.rjvm.BasicOutboundRequest.sendReceive(BasicOutboundRequest.java:108)
           at weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke(ReplicaAwareRemoteRef.java:290)
           at weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke(ReplicaAwareRemoteRef.java:247)
           at weblogic.jdbc.common.internal.RmiDataSource_814_WLStub.getConnection(Unknown Source)
           at weblogic.transaction.internal.DriverTest1.getConnection(DriverTest1.java:39)
           at weblogic.transaction.internal.DriverTest1.access$0(DriverTest1.java:34)
           at weblogic.transaction.internal.DriverTest1$SQLThread.run(DriverTest1.java:135)

                  2:某個全局事務中啟動的線程,不能同時操作同一個XAResource,比如Thread1操作datasource1和datasource2,thread2操作datasource2和datasource3。Weblogic中,我們做XA操作的時候,需要同后端的XA Resource Manager交互,交互中我們會多次調用xaStart(xid, flag),xaEnd(xid, flag)這里的flag可以使NOFLAGS、TMSUCESS、TMRESUME、TMSUSPEND等。如果我們在同一個全局事務的多個線程中同時操作某個RESOURCE,那么就可能我們不同線程先后給這個RESOUCE的RM發送相同的FLAG,比如xaStart(xid, TMSUSPEND),即兩個線程同時發送TMSUSPEND,這樣會引發XA_ERR,如下:

          java.sql.SQLException: Unexpected exception while enlisting XAConnection java.sql.SQLException: XA error: XAER_RMERR : A resource manager error has occured in the transaction branch start() failed on resource 'TestXAPool_1': XAER_RMERR : A resource manager error has occured in the transaction branch
          oracle.jdbc.xa.OracleXAException
           at oracle.jdbc.xa.OracleXAResource.checkError(OracleXAResource.java:1017)
           at oracle.jdbc.xa.client.OracleXAResource.start(OracleXAResource.java:227)
           at weblogic.jdbc.wrapper.VendorXAResource.start(VendorXAResource.java:50)
           at weblogic.jdbc.jta.DataSource.start(DataSource.java:629)
           at weblogic.transaction.internal.XAServerResourceInfo.start(XAServerResourceInfo.java:1142)
           at weblogic.transaction.internal.XAServerResourceInfo.xaStart(XAServerResourceInfo.java:1073)
           at weblogic.transaction.internal.XAServerResourceInfo.enlist(XAServerResourceInfo.java:241)
           at weblogic.transaction.internal.ServerTransactionImpl.enlistResource(ServerTransactionImpl.java:463)
           at weblogic.jdbc.jta.DataSource.enlist(DataSource.java:1392)
           at weblogic.jdbc.jta.DataSource.refreshXAConnAndEnlist(DataSource.java:1334)
           at weblogic.jdbc.jta.DataSource.getConnection(DataSource.java:396)
           at weblogic.jdbc.jta.DataSource.connect(DataSource.java:354)
           at weblogic.jdbc.common.internal.RmiDataSource.getConnection(RmiDataSource.java:305)
           at weblogic.jdbc.common.internal.RmiDataSource_WLSkel.invoke(Unknown Source)
           ......

                  雖然測試中沒有什么問題,但我不建議誰這么去做,畢竟我們需要遵循規范。寫這么個例子,只是讓大家對weblogic的transaction加深些理解,而不是真的要在生產系統中這樣去做。
          posted on 2009-07-31 15:18 走走停停又三年 閱讀(2493) 評論(0)  編輯  收藏 所屬分類: Weblogic
          主站蜘蛛池模板: 邳州市| 灵川县| 吴川市| 滨海县| 咸阳市| 沁水县| 来宾市| 临桂县| 乡城县| 义马市| 乌兰浩特市| 渭南市| 靖远县| 呈贡县| 登封市| 花莲市| 获嘉县| 离岛区| 鹤岗市| 苍梧县| 深泽县| 聊城市| 会泽县| 永年县| 东宁县| 田林县| 吴堡县| 唐海县| 富裕县| 七台河市| 新邵县| 中山市| 梁山县| 汶上县| 来安县| 敖汉旗| 平乡县| 夏河县| 修文县| 安宁市| 来安县|