在用戶修改了領域對象的值后,我們有時需要記錄下用戶的改動。比如對一些關鍵業務對象的改動有時往往需要發郵件通知客戶。有時用戶可能想查閱所有歷史的改動,甚至有可能會改回原先的值。
領域邏輯關系往往比較復雜,這時我們會使用到ORM Framework。本文以toplink為例,講述如何利用toplink編寫一個完成此功能的簡易Framework,我們暫且把它稱為ActionMemed。
我們先來看一下大體的流程:
l 我們獲得用戶修改信息通常有兩種方式,一種是被動的監聽,另一種主動的通知。被動的監聽就是framework訂閱所關心領域對象的修改,主動通知是application主動的將修改之前和之后的對象通知framework。
l Framework接著從整個對象的樹結構中找出用戶所關心的某個特定的字段或者字段的組合,生成actionRecord。ActionRecord是描述用戶對領域對象修改的數據結構,會包括用戶修改的原因,修改者,修改的時間,修改的字段或者組合,修改前后的值等等信息。
l 在ActionRecord生成好之后,會將它記錄到DB,發郵件通知用戶或者通過JMS通知其他Application。
有了基本的概念后,看一下整體的結構:
Registry: 在TopLink上注冊ActionListener。一旦在TopLink上檢測到業務對象的改動就會調用ActionService,生成ActionRecord并調用相關的ActionRecorder。
ActionListener:TopLink的SessionListener,每次會話都會調用。我們在這里實現了preCommit方法,在UnitOfWork提交之前,捕捉用戶的所有修改,并從中選取出用戶所關心的對象的變動。
ActionService:當ActionListener從TopLink中獲得到改動的對象,就會調用ActionService生成ActionRecord,并通知相關的Recorder,可能是Log到DB。如果用戶是通過主動的方式傳入新老兩個對象就不需要Listener,直接調用ActionService,將新老對象或者新對象和ValueDistiller作為參數傳入,
ValueDistiler:根據當前的新對象,萃取出老對象。TopLink就可以根據當前UnitOfWork中的新對象獲取原始對象。方法是:
public Object getOriginalVersionOfObject(Object workingClone) |
Expression:ActionMemed相關的配置數據,由ExpressionParser解析出來后就會cache在內存中。這個配置可以是文件,或者DB配置。只要能描述清楚就行。文件配置我們直接利用spring bean。
ActionConstructor:Listener從TopLink ChangeSet中拿到的只是有改動的對象。而我們關心的只是對象上某個Field或者它引用的某個對象的Field,比如說Employee有PhoneNumber List,PhoneNumber有個屬性是areaCode,可能我們只關心areaCode值的更改,就只需要記錄areaCode的更改,并且通知客戶。所以我們需要根據用戶配置對新老對象進行對比,比較是否有關注的屬性被用戶更改了。并且構建ActionRecord。比較的方法我們可以用JXpath, Xpath的表達能力很強,而且還可以自定義函數,在自定義擴展函數里用戶可以對字段進行組合處理,從而生成它們自己想要記錄的值。
ActionRecorder:當Action構建完成后,ActionRecorder就要將它通知客戶,用JMS發給其他項目或者記錄到DB。用戶可以配置多個ActionRecorder。
MemedEventListener,讓用戶在ActionRecorder調用之前和之后做一些額外的處理。比如說用戶可能在之前對Action的數據結構加入一些定制信息。
上面介紹了ActionMemed的流程和相關模塊的功能。其實在使用中,特別是一次修改很多業務對象的時候,處理Action時間會有點長,況且Action的處理也并不需要實時。所以Action還需要提供異步處理的功能。
將異步調用的模塊圖和先前的結構圖進行比較會發現有兩處不同:
ServiceTask: 實現Java Runnable接口,基本實現類似于先前圖中的ActionService。
ObjectCloner: 如果我們使用TopLink,在異步的情況下,用戶當前的UnitOfWork(事務)會先提交,提交之后,從TopLink中萃取的舊對象會被Merge成新對象,這時我們只能提前在UnitOfWork提交之前自己根據Expression的結構深Copy一份出來。
ActionAsyncService: 為異步設計的ActionService,利用ValueDistiller從UnitOfwork獲得當前對象的原始clone,構建ServiceTask,將ServiceTask提交到ThreadPool,當task被執行時,就會調用ActionService,這時的ActionService重用了同步流程中的ActonService。
幾個注意點:
整個Framework的原理還是相當簡單的,稍微值得注意的可能是下面幾個方面。
Listener如何獲取被改動的對象
TopLink會把所有改動過的對象都會被放在UnitOfWorkChangeSet中,因為在UnitOfWork提交的時候它需要將UnitOfWorkChangeSet中記下的改動提交到數據庫。然后merge到session cache。所以所有改動從UnitOfWork中都是可以拿到的。
public class ActionLogPassiveAsyncListener extends AbstractActionLogAsyncListener { private ValueDistiller distiller; public void preCommitUnitOfWork(SessionEvent sessionevent) { log.debug("preCommitUnitOfWork begin."); if (null == unitOfWork.getUnitOfWorkChangeSet()) { unitOfWork.setUnitOfWorkChangeSet((oracle.toplink.internal.sessions.UnitOfWorkChangeSet) unitOfWork .getCurrentChanges()); } UnitOfWorkChangeSet ucs = unitOfWork.getUnitOfWorkChangeSet(); if (ucs != null && ucs.getAllChangeSets() != null) { Set finishedObjects = new HashSet(); Map<Class<?>, List<ChangedPair>> changedPairs = new HashMap<Class<?>, List<ChangedPair>>(); for (Enumeration objectChangeSetEnum = ucs.getAllChangeSets().keys(); objectChangeSetEnum .hasMoreElements();) { ObjectChangeSet objectChangeSet = (ObjectChangeSet) objectChangeSetEnum.nextElement(); if (objectChangeSet == null) { continue; } Object clone = objectChangeSet.getUnitOfWorkClone(); if (!finishedObjects.contains(clone)) { for (Class focusClass : this.focusClasses) { if ((includeSubclass ? focusClass.isAssignableFrom(clone.getClass()) : clone.getClass() == focusClass) && (filter == null || (filter != null && !filter.isFiltered(clone, unitOfWork)))) { finishedObjects.add(clone); if (objectChangeSet.hasChanges()) { List<ChangedPair> changedPairList = changedPairs.get(focusClass); if (null == changedPairList) { changedPairList = new ArrayList<ChangedPair>(); changedPairs.put(focusClass, changedPairList); } Object originalObject = this.distiller.getOriginObject(clone); changedPairList.add(new ChangedPair(originalObject, clone)); } } } } } if (!changedPairs.isEmpty()) { try { ChangedPairMap changedPairMap = this.assembleChangedPairMap(unitOfWork, changedPairs); this.changedPairCache.set(changedPairMap); } catch (ActionLogException e) { if (shouldBreakIfException) { throw new ActionLogRuntimeException(e); } } } log.debug("preCommitUnitOfWork end."); } } } |
異步狀態下,ActionRecord要在Application事務提交之后生成
同步狀態下,ActionRecord的生成可以Join Application的transaction,這樣他們會一起成功或者失敗。但是異步情況下,就會是不同的事務,兩個事務之間的關系可能是有先后順序或者互不相干。互不相干是不可能的,從業務意義上講只有Application的改動確實生效之后ActionRecord才能生成,但是將ActionRecord放在Application事務提交成功之后生成或者提交,也會面臨一個問題,就是application成功提交了,但ActionRecord的生成可能會失敗。但要知道ActionRecord失敗的幾率遠比Application提交失敗的幾率要小得多,application常常會因為樂觀鎖的問題而提交失敗,但ActionRecord只可能因為DB Shutdown而丟失數據。失敗后會做詳細的備份,以便做恢復。那如何感知application事務是提交成功還是失敗了呢?TopLink的SessionEventListener有四個有用的回調方法:PreCommit,PostCommit,PostRollback,PostRelease,用戶事務提交的時候在提交之前會調用PreCommit方法,這時我們還可以從UnitOfWork中獲取新老對象,我們會把老對象深clone一份出來,將他們存放在ThreadLocal中,而在PostCommit回調的實現中,我們會從ThreadLocal中取出新老對象完成ActionRecord的生成,而PostRollback就可以什么都不干了。但不管是提交成功還是提交失敗Rlease方法都會被調用,UnitOfWork需要release,這里我們就會去清空ThreadLocal,以便內存即時的垃圾回收。這樣說來即使是主動調用ActionAsyncService也會注冊一個Listener,不同的是這個Listener不需要從UnitOfWork檢測變化。
public class AbstractActionAsyncListener extends AbstractActionListener { protected ThreadLocal<ChangedPairMap> changedPairCache = new ThreadLocal<ChangedPairMap>(); public void postCommitUnitOfWork(SessionEvent arg0) { ChangedPairMap changedPairMap = this.changedPairCache.get(); if (changedPairMap != null) { if (!changedPairMap.isEmpty()) { //異步生成ActionRecord } } } public void postReleaseUnitOfWork(SessionEvent arg0) { if (this.changedPairCache.get() != null) { this.clearResource(); } } private void clearResource() { this.changedPairCache.set(null); } } |
public class ActionActiveAsyncListener extends AbstractActionAsyncListener { private Map<Class<?>, List<ChangedPair>> changedPairs; public void preCommitUnitOfWork(SessionEvent sessionEvent) { try { ChangedPairMap changedPairMap = this.assembleChangedPairMap(unitOfWork, this.changedPairs); this.changedPairCache.set(changedPairMap); } catch (ActionLogException e) { log.error("Assemble ChangePairMap fails! ChangedPairs: " + changedPairs.toString(), e); if (shouldBreakIfException) { throw new ActionRuntimeException(e); } } } } |
ObjectCloner:
如果對象樹的結構很龐大,深copy的性能代價不得不考慮。BeanUtils進行深copy的性能很差。5000個對象花了我20s。首先要說的是其實不需要所有對象引用都需要深copy,只有那些用戶對關注的對象屬性才需要深copy,clone的步驟大概如下:
l 對根對象進行淺copy
l 對用戶關心的對象屬性迭代的進行深copy
l 如果關心的對象屬性是Collection,淺copy Collection中的每個對象并深copy對象中用戶關注的對象屬性
l 其實那些domain class,早在做ORM的時就確定下來了,所以所有domain對象反射metadata都可以事先確定,存在內存中,這樣會大大提高性能,其實toplink也會把這些反射結構解析出來后緩存在內存中,直接利用toplink的clone邏輯就可以了。1000個對象深clone一把大約是120ms。