走自己的路

          路漫漫其修遠兮,吾將上下而求索

            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            50 隨筆 :: 4 文章 :: 118 評論 :: 0 Trackbacks

          背景

          應用項目組每個小時會定時的run一個存儲過程進行結算,每次執行的時間也許會超過一個小時,而且需要絕對保證存儲過程的串行執行。因為使用內存鎖不能絕對保證兩個存儲過程的串行執行,因為應用服務器down掉重啟后可能會出現并發執行的情況,因為先前的存儲過程還在db中運行。我們是使用LTS,對quartz進行了封裝來做任務調度的。我們決定鎖的管理操作由framework來實現。原因是:

          l         鎖管理器可以做成通用的模塊

          l         申請鎖,釋放鎖是比較危險的操作,擔心業務開發人員由于遺忘導致死鎖或者并發問題

          l         可以很好的集成到我們現有的framework中,方便地開放給業務開發人員使用

          注意:我們極其不推薦使用悲觀離線鎖,如果沖突出現的概率比較少,可以用其他方法比如樂觀離線鎖,DB Constraint再通過補償操作能解決的問題,請不要使用悲觀離線鎖。

           

          原理

          PLSQL UL LOCKoracle提供出來給開發人員使用的鎖資源,功能和DML鎖是類似的,當然我們可以通過DML鎖來完成并發控制,select…for update或者自己維護一張鎖表,考慮到實現代價,我們打算使用PLSQL UL LOCK。而且Oracle保證了session釋放時,UL lock都會被釋放。

          但是用它時,需要注意到它的DBMS_LOCK.Unique函數它每次都會commit數據。如果是在分布式事務當中,會拋出事務已提交的異常。因為我們使用的是XA resource并且transaction levelglobal的,也就是JTA。為了使得鎖的申請和釋放不影響分布式業務事務,或者我們是使用非xaresourcelocaltransaction來完成鎖操作,或者也可以暫停已有事務,等鎖操作完成后resume暫停的分布式事務。考慮到重用已有的xa resource我們打算使用后一種方法,其實這種方法我們也會經常使用,暫停分布式事務做DDL操作,再釋放事務。

           

          實現方法:

          l         封裝DBMS_LOCK包中的幾個存儲過程為我們所用

          l         Java端提供一個基于PLSQL UL鎖的管理器

          l         Java端定義好申請鎖,業務操作,釋放鎖的使用流程,作為一個模板

           

          DB存儲過程:

          DBMS_LOCK做了簡單的封裝,避免直接調用DBMS_LOCK。這樣做的好處是:

          l         Oracle解耦,如果其他數據庫可以提供類似的功能,我們也可以用同名的存儲過程實現

          l         方便以后對存儲過程重構,升級

          l         我們需要對DBMS_LOCK進行簡單的封裝,因為DBMS_LOCK.Unique獲取lockhandle oracle中鎖的唯一標識,輸入是lockname,邏輯名,輸出是鎖的唯一標識,對java端應該是透明的,java端應該只關心鎖的邏輯名。

          create or replace package body frm_lts_processor_lock_pkg is
            
          /* table to store lockhandles by name */
             TYPE handle_tbltype IS TABLE OF varchar2(
          128)
                INDEX BY varchar2(
          128);

             v_lockhandle_tbl handle_tbltype;
            
           procedure frm_lts_lock_acquire(i_lock_name in varchar2, i_expiration_time in Integer default
          864000, i_wait_time in Integer default DBMS_LOCK.maxwait, o_result out number) as
                v_result     number;
                v_lockhandle varchar2(
          128);
             begin
             if v_lockhandle_tbl.count =
          0 then
                sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
                v_lockhandle_tbl(i_lock_name) := v_lockhandle;
             elsif v_lockhandle_tbl.exists(i_lock_name) then
                dbms_output.put_line(
          'atttacked');
                v_lockhandle := v_lockhandle_tbl(i_lock_name);
             else
               dbms_output.put_line(
          'new acquire');
               
          --acquire a unique lock id
               sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
               v_lockhandle_tbl(i_lock_name) := v_lockhandle;
                 
             end if;
             
          --acquire a lock
              v_result := sys.dbms_lock.request(v_lockhandle, dbms_lock.x_mode, i_wait_time, false);
           
             
          --set return values
               o_result := v_result;
           end frm_lts_lock_acquire;
           
           function frm_lts_lock_release(i_lock_name in varchar2) return number as
                 v_result number :=
          6;
                 v_lockhandle varchar2(
          128);
           begin
               
          --release lock according to lockhandle
                if v_lockhandle_tbl.exists(i_lock_name) then
                    v_lockhandle :=  v_lockhandle_tbl(i_lock_name);
                    v_result := sys.dbms_lock.release(v_lockhandle);
                    v_lockhandle_tbl.delete(i_lock_name);       
                end if;
                return v_result;
           end frm_lts_lock_release;

          end frm_lts_processor_lock_pkg;
          /

           

           

          鎖管理器:

          其實應用項目組有多個這樣的存儲過程,而這些存儲過程之間的串行執行可以有多個business key來決定的,比如job order numberdelivery order等。所以我們需要給他們提供多鎖管理機制。我們會對這多個鎖進行排序,以避免死鎖,并強烈推薦應用項目設置超時時間。這些business key是由String對象構成的,為了防止大量的業務操作被鎖在null或者空string這樣沒有意義的business key上面,我們對application提供的鎖集合還需要進行過濾。

          原理還是很簡單的,就是在本地事務中調用db端的申請鎖,釋放鎖的存儲過程,然后對返回的結果進行一系列處理。

          在使用多鎖機制的時候要保證,如果只申請到了部分鎖,在申請其中另外一個鎖時發生了錯誤或者超時,要能夠安全地將已申請的鎖釋放掉,所以多鎖申請需要記錄已申請到的鎖,并且記錄發生的錯誤,區分timeout和異常。Timeout返回false,如果出現異常記錄下來,最后拋出。釋放多鎖時,不能被中斷,記錄釋放每個鎖后的結果,最后判定如果其中一些鎖釋放時發生了錯誤,拋出。

           

          handleLock定義暫停jta事務,執行鎖操作,釋放jta事務流程

          private Object handleLock(Connection connection,
                      LocalTransactionCallback localTransactionCallback)
                      
          throws LockException {
                  TransactionManager tm 
          = null;
                  Transaction currentTx 
          = null;
                  Object result 
          = null;
                  
          try {
                      Context initialContext 
          = new InitialContext();
                      UserTransaction userTrx 
          = (javax.transaction.UserTransaction) initialContext
                              .lookup(
          "java:comp/UserTransaction");
                      
          if (!(userTrx.getStatus() == Status.STATUS_NO_TRANSACTION)) {
                          tm 
          = TransactionUtils.getTransactionManager(userTrx);
                          
          if (tm != null{
                              currentTx 
          = tm.suspend();
                          }

                      }

                      result 
          = localTransactionCallback
                              .executeInLocalTransaction(connection);

                      
          if (null != currentTx) {
                          tm.resume(currentTx);
                      }

                  }
           catch (NamingException e) {
                  }
           catch (SystemException e) {
                  }
           catch (InvalidTransactionException e) {
                  }
           catch (IllegalStateException e) {
                  }

                  
          return result;
              }


          多鎖申請操作是上面流程的一個回調

          private class ObtainMutipleLocksLocalTransactionCallback implements
                      LocalTransactionCallback 
          {
                  
          private Set<String> lockNames;
                  
          private int waitTime;

                  ObtainMutipleLocksLocalTransactionCallback(Set
          <String> lockNames,
                          
          int waitTime) {
                      
          this.lockNames = lockNames;
                      
          this.waitTime = waitTime;
                  }

                  
          public Object executeInLocalTransaction(Connection conn) {
                      CallableStatement lockAcquireStmt 
          = null;
                      Set
          <String> obtainedLockNames = new HashSet<String>();
                      
          boolean timeOut = false;
                      String timeOutLockName 
          = null;
                      Exception mifLockException 
          = null;
                      
          try {
                          lockAcquireStmt 
          = conn.prepareCall(OBTAIN_LOCK_PROC_CALL);
                          
          for (String lockName : lockNames) {
                              lockAcquireStmt.setString(
          1, lockName);
                              lockAcquireStmt.setInt(
          2, LCOK_EXPIRE_TIME);
                              lockAcquireStmt.setInt(
          3, waitTime);
                              lockAcquireStmt.registerOutParameter(
          4,
                                      java.sql.Types.INTEGER);
                              lockAcquireStmt.registerOutParameter(
          5,
                                      java.sql.Types.VARCHAR);
                              lockAcquireStmt.executeUpdate();
                              
          int lockacquireResult = lockAcquireStmt.getInt(4);
                              
          if (lockacquireResult == ULLockResultType.SUCCESSFUL) 
                                  obtainedLockNames.add(lockName);
                              }
           else if (lockacquireResult == ULLockResultType.TIMEOUT) {
                                  timeOut 
          = true;
                                  timeOutLockName 
          = lockName;
                                  
          break;
                              }
           else if (lockacquireResult != ULLockResultType.ALREADY_OWNED) {
                                  String lockResultDesc 
          = ULLockResultType
                                          .getAcquireTypeDesc(lockacquireResult);
                                  LockException lockException 
          = new LockException(
                                          
          "Obtain lock " + lockName
                                                  
          + " fails, the reason is "
                                                  
          + lockResultDesc + " .");
                                  lockException.setLockName(lockName);
                                  lockException.setLockHandlingResult(lockResultDesc);
                                  
          throw lockException;
                              }
           else {
                              }

                          }

                      }
           catch (Exception ex) {
                          mifLockException 
          = ex;
                      }
           finally {
                          
          if (null != lockAcquireStmt) {
                              
          try {
                                  lockAcquireStmt.close();
                              }
           catch (SQLException e) {
                                  
          // swallow
                              }

                          }

                      }

                      
          boolean success = true;
                      
          if (timeOut || mifLockException != null{
                          success 
          = false;
                      }

                      
          return new ObtainMultipleLocksResult(success, obtainedLockNames,
                              timeOut, timeOutLockName, mifLockException);
                  }

              }


          多鎖釋放操作也是事務暫停流程的一個回調

          private class ReleaseMultipleLocksLocalTransactionCallback implements
                      LocalTransactionCallback 
          {
                  
          private Set<String> lockNames;

                  ReleaseMultipleLocksLocalTransactionCallback(Set
          <String> lockNames) {
                      
          this.lockNames = lockNames;
                  }


                  
          public Object executeInLocalTransaction(Connection conn) {
                      CallableStatement lockReleaseStmt 
          = null;
                      Map
          <String, Exception> mifLockErrors = new HashMap<String, Exception>();
                      Set
          <String> releasedLocks = new HashSet<String>();
                      
          try {
                          
          try {
                              lockReleaseStmt 
          = conn.prepareCall(RELEASE_LOCK_PROC_CALL);
                          }
           catch (Exception ex) {
                              
          for (String lockName : lockNames) {
                                  mifLockErrors.put(lockName, ex);
                              }

          return new ReleaseMutipleLocksResult(false, releasedLocks, mifLockErrors);
                          }


                          
          for (String lockName : lockNames) {
                              
          try {
                                  lockReleaseStmt.registerOutParameter(
          1,
                                          java.sql.Types.INTEGER);
                                  lockReleaseStmt.setString(
          2, lockName);
                                  lockReleaseStmt.executeUpdate();
                                  
          int lockReleaseResult = lockReleaseStmt.getInt(1);
                                  
          if (lockReleaseResult == ULLockResultType.SUCCESSFUL) {
                                      releasedLocks.add(lockName);
                                  }
           else {
                                      String lockResultDesc 
          = ULLockResultType
                                              .getReleaseTypeDesc(lockReleaseResult);
                                      LockException lockException 
          = new LockException(
                                              
          "Release lock " + lockName
                                                      
          + " fails, the reason is "
                                                      
          + lockResultDesc + " .");
                                      lockException.setLockName(lockName);
                                      lockException.setLockHandlingResult(lockResultDesc);
                                      mifLockErrors.put(lockName, lockException);
                                  }

                              }
           catch (Exception ex) {
                                  mifLockErrors.put(lockName, ex);
                              }

                          }

                      }
           finally {
                          
          if (null != lockReleaseStmt) {
                              
          try {
                                  lockReleaseStmt.close();
                              }
           catch (SQLException e) {
                              }

                          }

                      }

                      
          boolean success = releasedLocks.size() == this.lockNames.size();
                      
          return new ReleaseMutipleLocksResult(success, releasedLocks,
                              mifLockErrors);
                  }

              }


          使用模板:注意鎖的釋放要寫在finally語句塊里面,保證鎖的釋放。

          定義好模板,防止Application用戶直接調用鎖管理器或者濫用鎖,忘記釋放鎖。我們決定定義一個模板,做到鎖的申請和釋放對application用戶來說是透明的,把它做成了隱含鎖。

          public void execute(JobExecutionContext context)
                      
          throws JobExecutionException {
                  Map jobDataMap 
          = context
                  .getJobDetail().getJobDataMap();
                  Collection
          <String> lockKeys = (Collection<String>) jobDataMap.get(LOCK_NAME_KEY);
                  Integer waitTimeInteger 
          = (Integer) jobDataMap
                  .get(LOCK_WAIT_TIME_SECONDS_KEY);
                  
          int waitTime = MAX_WAITTIME;
                  
          if (waitTimeInteger != null{
                      waitTime 
          = waitTimeInteger.intValue();
                  }

                  Set
          <String> uniqueLockKeys = new HashSet<String>(lockKeys);

                  
          // filter empty keys
                  Iterator<String> keyIterator = uniqueLockKeys.iterator();
                  
          while (keyIterator.hasNext()) {
                      String key 
          = keyIterator.next();
                      
          if (StringUtils.isEmptyNoOffset(key)) {
                          keyIterator.remove();
                      }

                  }

                  
          if (CollectionUtils.isNotEmptyCollection(uniqueLockKeys)) {
                      Set
          <String> obtainedLockNames = null;
                      Connection connection 
          = null;
                      
          try {
                          connection 
          = DataSource.getConnection();
                          ObtainMultipleLocksResult result 
          = LOCK_MANAGER.obtainLock(
                                  connection, uniqueLockKeys, waitTime);
                          obtainedLockNames 
          = result.getObtainedLockNames();
                          
          if (!result.isSuccess()) {
                              
          if (result.isTimeout()) {
                                
          //do log
                                  return;
                              }
           else {
                                  JobExecutionException jobException 
          = new JobExecutionException(
                                          
          "Obtain locks failed! "
                                                  
          + result.getMifLockException()
                                                          .getMessage(), result
                                                  .getMifLockException());
                                  
          throw jobException;
                              }

                          }

                          
          this. executeInLock (context);
                      }
           catch (Throwable e) {
                          
          throw new JobExecutionException(
                                  
          "Get db connection failed!" + e.getMessage(), e);
                      }
           finally {
                          
          if (null != connection) {
                              
          this.releaseLocks(connection, obtainedLockNames);
                              
          try {
                                  connection.close();
                              }
           catch (SQLException e) {
                                  
          throw new JobExecutionException(
                                          
          "close  db connection failed!" + e.getMessage(), e);
                              }

                          }

                      }

                  }
           else {
                      
          this.executeInLock(context);
                  }

              }


           

          executeInLockapplication的子類繼承實現

           

          緩存

          l         緩存悲觀離線鎖

          l         緩存lockhandle

          因為使用的是悲觀離線鎖,每次申請鎖都要跑一趟db,但如果當前線程已經是lock的所有者就不需要白跑一趟了。可以用ThreadLocal把當前線程已經擁有的鎖緩存起來,釋放鎖時對應的需要清除緩存。

          在申請鎖時,需要獲得UL Lock時的lockhandle,釋放鎖時也需要提供鎖的lockhandle,我們需要將它緩存起來,主要是因為DBMS_LOCK.Unique每次都會commit,會影響性能,這樣每次釋放鎖時就可以直接使用lockhandle了。有兩種方法對lockhandle進行緩存,緩存在java端作為實例變量,緩存在plsql包的全局變量中。緩存在java端需要注意的是,lock manager不能作為單例或者享元來使用,否則lock handle的緩存在多jvm之間也存在著并發控制和同步的問題。

          源代碼:

          Java:
          ULLock-sources.rar

          PLSQL:
          lockplsql.rar

          參考:

          http://docstore.mik.ua/orelly/oracle/bipack/ch04_01.htm




          posted on 2009-07-03 19:24 叱咤紅人 閱讀(1504) 評論(0)  編輯  收藏

          只有注冊用戶登錄后才能發表評論。


          網站導航:
          博客園   IT新聞   Chat2DB   C++博客   博問  
           
          主站蜘蛛池模板: 酒泉市| 茶陵县| 桃源县| 雅江县| 许昌县| 山东| 东至县| 信阳市| 德兴市| 沂水县| 东山县| 丰顺县| 桐乡市| 嘉义县| 株洲市| 疏附县| 淳安县| 苗栗市| 宁德市| 临猗县| 新乡县| 乳源| 常州市| 丁青县| 岫岩| 平凉市| 鄂温| 五家渠市| 白城市| 清苑县| 贺兰县| 伊宁市| 锦州市| 托克托县| 五原县| 富顺县| 泸西县| 彝良县| 渝北区| 宁化县| 油尖旺区|