建議查看原帖:http://www.javaeye.com/topic/80697
開發目的:一個協同平臺項目,多托管用戶,單門戶系統,每個托管用戶對應一個單一數據庫,要求根據登陸用戶的單位信息,自動選擇操作數據庫;同時,涉及跨庫操作(比如跨庫查詢,跨庫單據發送);同時事務處理必須支持這種多數據庫模式,支持一些邏輯性不強的跨庫事務,比如一些數據的發送和接收等
當然,如果說跨庫操作只涉及到數據的發送和接受的話,也可以通過構建專門web service以及通信線程來處理,
開發環境: tomcat4.1,webwork2.2.4,spring2.0.4,hibernate3.1,osworkflow2.8,mysql5.0.19 由于正式發布的應用服務器是weblogic8.1,所以沒有采用jdk5環境以及struts2
準備:
問題一由于有跨庫操作,而且這種跨庫操作無法預知,有的跨庫操作完全是在邏輯運行中決定的,比如A托管用戶或則C、D向B托管用戶發了訂單 ,B回復,這個回復是根據訂單發送者來說的,具體到后臺操作,是無法事先預知針對具體哪個后臺物理數據庫操作的.所以,也就是說,存在在業務執行過程中切換數據庫的情況,傳統的到注入設置dao類 sessionFactory、靠filter以及Interceptor設置線程安全的sessionFactory都無法完全達到設計目的
問題二事務,本來,我打算用JtaTransactionManager的,除了JtaTransactionManager,在開始時也實在沒想到什么好的辦法, 難道,JtaTransactionManager是唯一選擇么?
步驟:
、因為問題一,所以系統在資源方面是多sessionFactory并存的方式,也就是多少個托管用戶多少個sessionFactory,當然,事實上,這種應用型項目本身并發訪問量不會太高(什么,很高么,總比不過廣告聯盟吧,哈哈).不用擔心多了幾個sessionFactory會對系統或則數據庫造成多大影響.
然后復制,粘貼,修改jdbcUrl,象網站編輯一樣..
假設,我配置了兩個sessionFatory ,一個是mitSessionFactory,一個是testSessionFactory,如下:
這個我自己系統配置的一部分,系統會解析他,從而知曉究竟存在多少個sessionFactory,item's XmlNode中的id可以理解會托管客戶的客戶單位
編號,當然,這個配置完全可以忽略,直接從ApplicationContext中一樣可以獲取到這樣的信息
在客戶登陸的時候,系統要記錄下該客戶所屬托管單位,然后通過上面的id找到bean's name ,最后獲取這個sessionFactory,托管單位信息一般
都是個編號而已跟己方系統的托管用戶管理相結合,一般是保存這個編號在session里面,也可以象asp.net一樣,記錄在安全憑證里,還不知道JAVA方面有沒有類似實現,個人認為asp.net這個方法很值得采用,雖然MS號稱安全系數+++++這個觀點值得懷疑
首先建立一個類,HibernateSupport ,存放當前請求線程所需sessionFactory- public class HibernateSupport {
- public static final String HIBERNATE_SESSIONIDKEY = "com.mit.hibernatesupport.factory.id";
- private static final Logger logger = Logger.getLogger(HibernateSupport.class);
- private static ApplicationContext applicationContext ;
- private static boolean singleSession=true;
- private static Map factorybeanset;
- private static ThreadLocal switchhistory;//在切換不同sessionFactory的時候用到
- private static ThreadLocal idset;//記錄當前默認的托管用戶id,在實際使用中,這個是可以取消掉的
- private static ThreadLocal curfactory;////當前正在使用的sessionFactory
- private static ThreadLocal trace;//一個sessionFactory集合,用來記錄這次線程調用了那些sessionFactory
- static
- {
- idset = new ThreadLocal();
- curfactory = new ThreadLocal();
- trace = new ThreadLocal();
- switchhistory = new ThreadLocal();
- }
- /**
- * set current sessionfactory for the Request
- * @param ServletContext
- * @param the factory's id defined in courser.xml
- */
- public static synchronized void setCurrent(ServletContext context,Object id)
- {
- if (idset.get()==null)
- {
- idset.set(id);
- if (factorybeanset.containsKey(id))
- {
- if (applicationContext==null)
- {
- applicationContext =
- WebApplicationContextUtils
- .getWebApplicationContext(context);
- }
- curfactory.set((SessionFactory)applicationContext
- .getBean((String)factorybeanset.get(id)));
- putTrace(idset.get(),(SessionFactory)curfactory.get());
- }
- }
- }
- /**
- * put the sessionfactory to tracemap
- * @see COPenSessionInViewFilter release sessionfactory in tracemap
- * @param the factory's id defined in courser.xml
- * @param hibernate's sessionfactory
- */
- private static void putTrace(Object id ,SessionFactory factory)
- {
- Map tracemap = null;
- if (trace.get()==null)
- {
- tracemap = new HashMap();
- trace.set(tracemap);
- }
- else
- {
- tracemap = (Map)trace.get();
- }
- if (!tracemap.containsKey(id))
- {
- tracemap.put(id, factory);
- }
- }
- /**
- * switch current sessionfactory
- * @param the factory's id defined in courser.xml
- */
- public static synchronized void swtichFactory(Object id)
- {
- if (!idset.get().equals(id) )
- {
- if (factorybeanset.containsKey(id))
- {
- SessionFactory oldfactory = (SessionFactory)curfactory.get();
- SessionFactory newfactory = (SessionFactory)applicationContext
- .getBean((String)factorybeanset.get(id));
- curfactory.set(newfactory);
- pushHistory(oldfactory);
- putTrace(id,newfactory);
- bindSessionFactory(newfactory);
- }
- }
- }
- /**
- * restore sessionfactory from queue of switchhistory
- */
- public static synchronized void restoreFactory()
- {
- SessionFactory factory = popHistory();
- if (factory!=null)
- {
- curfactory.set(factory);
- }
- }
- /**
- * push old sessionfactory to swithhistory after swtichFactory
- * @param hibernate's sessionfactory
- */
- private static void pushHistory(SessionFactory sessionfactory)
- {
- LinkedList list = null;
- if (switchhistory.get()==null)
- {
- list = new LinkedList();
- switchhistory.set(list);
- }
- else
- {
- list = (LinkedList)switchhistory.get();
- }
- list.add(0,sessionfactory);
- }
- /**
- * pop sessionfactory in queue
- */
- private static SessionFactory popHistory()
- {
- if (switchhistory.get()!=null)
- {
- LinkedList list = (LinkedList)switchhistory.get();
- if (list.size()>0)
- {
- SessionFactory factory = (SessionFactory)list.getFirst();
- list.removeFirst();
- return factory;
- }
- }
- return null;
- }
- public static Map getTraceMap()
- {
- if (trace.get()!=null)
- {
- return (Map)trace.get();
- }
- return null;
- }
- public static SessionFactory getCurrentFactory()
- {
- return (SessionFactory)curfactory.get();
- }
- public static synchronized void release()
- {
- idset.set(null);
- curfactory.set(null);
- switchhistory.set(null);
- trace.set(null);
- }
- /**
- * °ó¶¨sessionFactoryµ½springµÄ×ÊÔ´¹ÜÀí
- * @param hibernate's sessionfactory
- */
- private static synchronized boolean bindSessionFactory(SessionFactory sessionFactory)
- {
- boolean participate=false;;
- if (singleSession) {
- // single session mode
- if (TransactionSynchronizationManager.hasResource(sessionFactory)) {
- // Do not modify the Session: just set the participate flag.
- participate = true;
- }
- else {
- logger.debug("Opening single Hibernate Session in OpenSessionInViewFilter");
- Session session = getSession(sessionFactory);
- if (!TransactionSynchronizationManager.hasResource(sessionFactory))
- {
- TransactionSynchronizationManager.bindResource(sessionFactory, new SessionHolder(session));
- }
- }
- }
- else {
- // deferred close mode
- if (SessionFactoryUtils.isDeferredCloseActive(sessionFactory)) {
- // Do not modify deferred close: just set the participate flag.
- participate = true;
- }
- else {
- SessionFactoryUtils.initDeferredClose(sessionFactory);
- }
- }
- return participate;
- }
- //see SessionFactoryUtils
- private static Session getSession(SessionFactory sessionFactory) throws DataAccessResourceFailureException {
- Session session = SessionFactoryUtils.getSession(sessionFactory, true);
- FlushMode flushMode = FlushMode.COMMIT;
- if (flushMode != null) {
- session.setFlushMode(flushMode);
- }
- return session;
- }
- public static synchronized void initSessionFactory(Map res,Class loadclass)
- {
- factorybeanset =res;
- }
- }
- if (idset.get()==null)
- {
- idset.set(id);
- if (factorybeanset.containsKey(id)) //factorybeanset包含的就是我自己系統配置中那一部分,key就是id,,value就是sessionFactory 在spring環境中的beanName
- {
- if (applicationContext==null)
- {
- applicationContext =WebApplicationContextUtils.getWebApplicationContext(context);
- }
- curfactory.set((SessionFactory)applicationContext.getBean((String)factorybeanset.get(id)));//設置當前的sessionFactory
- putTrace(idset.get(),(SessionFactory)curfactory.get());//put到當前線程的一個記錄集
- }
- }
多sessionFactory好做,配置在spring或則單獨拿出來處理都可以,但是spring的HibernateDaoSupport 必須綁定一個sessionFactory,當然,我們完全可以寫一個自己的HibernateDaoSupport ,但是既然用了spring的事務管理而又不想多花時間,還是將就改改用吧
- private SessionFactory sessionFactory;
- publicvoid setSessionFactory(SessionFactory sessionFactory) {
- this.sessionFactory = sessionFactory;
- }
去掉HibernateAccessor和HibernateTransactionManager中的上述兩段代碼,當然,也別忘了斃掉兩個類中的
afterPropertiesSet方法中那些檢查代碼
然后 ,ant打包就可以了,如果不想修改spring的代碼,也可以單獨把這幾個類提出來另建jar包,我是單獨提出來新建的,比如HibernateTransactionManager我改名成CHibernateTransactionManager,其他類似,但是包名必須是org.springframework.orm.hibernate3 ,誰也不想這么做,可是誰讓sessionFactoryUtils中一個closexxxx方法沒定義成public了??
如果想變更sessionFactoryUtils,奉勸算了吧..
然后可以做測試了,首先,部署測試的dao和service,先是事務部署
- <beanid="transactionManager"
- class="com.mit.web.hibernate.CHibernateTransactionManager"/>
- <beanid="transres"class="org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource">
- <propertyname="properties">
- <props>
- <propkey="load*">PROPAGATION_REQUIRED,readOnlyprop>
- <propkey="save*">PROPAGATION_REQUIREDprop>
- <propkey="delete*">PROPAGATION_REQUIREDprop>
- <propkey="find*">PROPAGATION_REQUIRED,readOnlyprop>
- <propkey="query*">PROPAGATION_REQUIRED,readOnlyprop>
- <propkey="create*">PROPAGATION_REQUIREDprop>
- <propkey="set*">PROPAGATION_REQUIRED,readOnlyprop>
- <propkey="execute*">PROPAGATION_REQUIREDprop>
- props>
- property>
- bean>
- <beanid="transactionInterceptor"class=span
事務配置如下:
- <bean id="transactionManager"
- class="com.mit.web.hibernate.CHibernateTransactionManager"/>
- <bean id="transres" class="org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource">
- <property name="properties">
- <props>
- <prop key="load*">PROPAGATION_REQUIRED,readOnly</prop>
- <prop key="save*">PROPAGATION_REQUIRED</prop>
- <prop key="delete*">PROPAGATION_REQUIRED</prop>
- <prop key="find*">PROPAGATION_REQUIRED,readOnly</prop>
- <prop key="query*">PROPAGATION_REQUIRED,readOnly</prop>
- <prop key="create*">PROPAGATION_REQUIRED</prop>
- <prop key="set*">PROPAGATION_REQUIRED,readOnly</prop>
- <prop key="execute*">PROPAGATION_REQUIRED</prop>
- </props>
- </property>
- </bean>
- <bean id="transactionInterceptor" class="org.springframework.transaction.interceptor.TransactionInterceptor">
- <property name="transactionManager" ref="transactionManager"/>
- <property name="transactionAttributeSource"><ref local="transres"/></property>
- </bean>
- <bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator">
- <property name="beanNames"><value>*Service</value></property>
- <property name="interceptorNames">
- <list>
- <value>transactionInterceptor</value>
- </list>
- </property>
- </bean>
- <bean class="org.springframework.transaction.interceptor.TransactionAttributeSourceAdvisor">
- <property name="transactionInterceptor" ref="transactionInterceptor"/>
- </bean>
我把事務配置在service層,事務管理采用的是我修改后的HibernateTransactionManager ,他不在要求sessionFactory的注入了
以下是測試的dao類和service類配置
- <bean id="personDao" class="com.mit.web.action.PersonDaoImpl"/>
- <bean id="personService" class="com.mit.web.action.PersonServiceImpl">
- <property name="personDao">
- <ref bean="personDao"/>
- </property>
- </bean>
- <bean id="departDao" class="com.mit.web.action.DepartDaoImpl"/>
- <bean id="departService" class="com.mit.web.action.DepartServiceImpl">
- <property name="personService">
- <ref bean="personService"/>
- </property>
- <property name="departDao">
- <ref bean="departDao"/>
- </property>
- </bean>
具體代碼不寫了,大概如下:
- public class PersonDaoImpl extends CHibernateDaoSupport implements PersonDao;
- public class PersonServiceImpl implements PersonService;
- public class DepartDaoImpl extends CHibernateDaoSupport implements DepartDao;
- public class DepartServiceImpl implements DepartService;
測試代碼的目的是將兩個實體類分別存放到mit和test庫,假設默認用戶是mit這個托管庫的,這個方法就寫在DepartServiceImpl類里面
- public class DepartServiceImpl implements DepartService {
- private DepartDao dao;
- private PersonService service;
- ..............................
- public void executeTest(Depart depart,Person person)
- {
- dao.save(depart);
- HibernateSupport.switchFactory("test");
- service.save(person);
- HibernateSupport.restoreFactory();
- }
- ..............................
上面代碼中將depart對象存到mit庫,而將person對象存到test庫中.
事務 事務 ,沒看見特別事務設置? 其實事務已經在運行了
如前面配置,事務我配置在了service層,那么當executeTest執行時,會啟動針對mit庫的事務(前面我們改了HibernateTransactionManager的sessionFactory獲取方式),
dao.save(depart);//然后保存depart到mit庫
HibernateSupport.switchFactory("test");//切換庫到test
service.save(person);//調用personService的save方法,這也是個service,所以也會產生一個事務,而此時HibernateSupport.getCurrentFactory返回的sessionFactory已經是test庫的了,所以,spring事務管理發現上下文中并沒有針對test的事務,于是,會重新啟動一個新的事務,這個service.save(person);方法執行完后,這個事務將會提交
HibernateSupport.restoreFactory();//切換回默認數據庫
整個方法執行完后,針對mit的事務將會被提交
[color=green]如果service.save(person);發生異常,這兩個事務就會被提交,一個簡單跨庫事務控制完成[color=green]
但是,問題也隨之而來,如果在 HibernateSupport.restoreFactory();后,又進行了一些邏輯操作,那么發生異常時,而由于 service.save(person);這個事務已經被提交,也就是說,針對test的事務已經提交不會回滾了,這是個非常嚴重的問題。。
其實,也有解決方法,就是延遲提交,具體實現方式以后講述
跨庫事務之延遲提交
延遲事務,就是將事務延后提交,延遲的時間由事務管理器掌握。在我的系統中,只有跨庫操作涉及到延遲提交,針對這種操作,我設計了一個基本的執行模型。就是如果一個邏輯中存在多個事務,將全部放到邏輯執行完以后提交,那么,既然如此,開始吧
PlatformTransactionManager是spring平臺相關事務,比如HibernateTransactionManager都是繼承于此類,為了達到延遲提交的目的,可以在AbstractPlatformTransactionManagershang上做修改達到目的
首先,說一下在spring中,通常的一個事務流程,
流程如下:初始化Transaction B,如果發現前面有其他Transaction ,比如 Transaction A,那么掛起TransactionA ,然后啟動事務 ,當邏輯執行完后 ,commit,恢復掛起事務A,然后清除同步對象以及其他資源,如果執行發生異常,當然,異常發生后 rollback
延遲提交的設計思想是將事務都暫存在一個threadlocal的LIST里面,等邏輯執行完以后,再統一提交,那么首先在AbstractPlatformTransactionManager中設置一個threadlocal對象
- private ThreadLocal lazytrace = new ThreadLocal();
LazyTransactionStatus用來管理需要延遲提交的事務
- private static class LazyTransactionStatus
- {
- private java.util.List transliat;
- private int transnum;//代表未提交事務數量
- public LazyTransactionStatus()
- {
- transliat= new java.util.ArrayList();
- transnum=0;
- }
- public void newLazy()
- {
- transnum++;
- }
- public void push(TransactionStatus trasobj)
- {
- objmap.add(trasobj);
- }
- public void removeLazy(TransactionStatus trasobj)
- {
- transnum--;
- }
- public boolean canCommit()
- {
- if (transnum<1)
- {
- return true;
- }
- else
- return false;
- }
- public java.util.List getTransactionObjs()
- {
- return transliat;
- }
- }
這就是local對象的存儲內容.translist存放當前執行中的TransactionStatus實例
TransactionStatus顧名思義,事務狀態,包含了事務、掛起資源等一系列信息
然后以事務提交為例
然后在AbstractTransactionManager增加如下兩個方法
- public final boolean isCommit(TransactionStatus status)
- {
- if (lazytrace.get()!=null)
- {
- LazyTransactionStatus lazystatus = (LazyTransactionStatus)lazytrace.get();
- lazystatus.removeLazy(status);
- return lazy.canCommit();
- }
- return true;
- }
- protected void begin(Object transaction, TransactionDefinition definition)
- {
- doBegin(transaction,definition);
- LazyTransactionStatus lazystatus = null;
- if (lazytrace.get()==null)
- {
- lazystatus = new LazyTransactionStatus();
- lazytrace.set(lazystatus);
- }
- else
- {
- lazystatus = (LazyTransactionStatus)lazytrace.get();
- }
- lazystatus.newLazy();
- }
- public final void registerTraceStatus(TransactionStatus status)
- {
- LazyTransactionStatus lazystatus = null;
- if (lazytrace.get()==null)
- {
- lazystatus = new LazyTransactionStatus();
- lazytrace.set(lazystatus);
- }
- else
- {
- lazystatus = (LazyTransactionStatus)lazytrace.get();
- }
- lazystatus.push(status);
- }
begin ,當一個事務開始時,將LazyTransactionStatus的transnum+1,表示又多了個剛開始,還未提交的事務
registerTraceStatus發生在commit的時候,注冊這個事務到LazyTransactionStatus,同時,
注意 transnum表示的是未提交事務數量,所以當事務管理器執行commit表示要提交一個事務后,transnum將減一,如果減一后發現transnum<1,表示所有事務都提交了,那么,將所有事務提交。否則,不提交,繼續等待...
關系如下:
begin->transnum+1 表示新事務誕生
registerTraceStatus(發生在commit的時候)->將準備提交的TransStatus放到LazyTransactionStatus,是的,這個事務要提交了,來吧,先注冊一下
緊接著
isCommit()->將transnum-1,如果發現transnum小于1 ,表示鬧夠了,可以都提交滾蛋了
注意 ,transnum與LazyTransactionStatus的translist的鏈表長度在執行commit的時候是反方向發展的 一個增,一個減
好了,首先是注冊事務數量,不用管了,在事務開始時begin方法它自己會調用了,
然后是修改AbstractPlatformTransactionManager的commit方法
- public final void commit(TransactionStatus txstatus) throws TransactionException {
- this.registerTraceStatus(txstatus);
- if (this.isCommit(txstatus))
- {
- int error = 0;
- LazyTransactionStatus lazystatus = (LazyTransactionStatus)lazytrace.get();
- List statuslist = lazystatus.getTransactionObjs();
- for (int i=0;i<statuslist.size();i++)
- {
- try
- {
- TransactionStatus status = (TransactionStatus)statuslist.get(i);
- if (status.isCompleted()) {
- error++;
- continue;
- //throw new IllegalTransactionStateException(
- // "Transaction is already completed - do not call commit or rollback more than once per transaction");
- }
- DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
- if (defStatus.isLocalRollbackOnly()) {
- if (defStatus.isDebug()) {
- logger.debug("Transactional code has requested rollback");
- }
- processRollback(defStatus);
- error++;
- continue;
- }
- if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
- if (defStatus.isDebug()) {
- logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
- }
- processRollback(defStatus);
- // Throw UnexpectedRollbackException only at outermost transaction boundary
- // or if explicitly asked to.
- if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
- //throw new UnexpectedRollbackException(
- //"Transaction rolled back because it has been marked as rollback-only");
- error++;
- continue;
- }
- continue;
- }
- processCommit(defStatus);
- }
- catch (Exception ex)
- {
- error++;
- ex.printStackTrace();
- continue;
- }
- }
- lazytrace.set(null);
- if (error>0)
- throw new IllegalTransactionStateException(
- "Not commit all transaction");
- }
- }
this.registerTraceStatus(txstatus);//事務提交了,成了嫌疑犯,拖到threadlocal的LazyTransactionStatus監獄里面先關起來
if (isCommit()) //看看監獄的事務是不是滿了,如果滿了,就可以全放了
LazyTransactionStatus lazystatus = (LazyTransactionStatus)lazytrace.get();
List statuslist = lazystatus.getTransactionObjs();
for (int i=0;i<statuslist.size();i++)
{
........
processCommit(defStatus);
//看來真的滿了,都放了吧
回滾道理是一樣的,不過不用判斷了,直接全部放出來讓他們滾吧
當然,目前這個實現只是個模型,真要實際應用,還需要做進一步封裝,實際做,我用了OpenSessionInViewFilter,我已經做過測試,測試了了OpenSessionInViewFilter中singleSession為true和false兩種情況,測試都通過了,呵呵