現(xiàn)在開始要介紹的從緩存中讀取數(shù)據(jù)的過程,還是在GeneralCacheAdministrator#getFromCache(),這里有3個同名方法,還是找一個參數(shù)最多的:

/** *//**
* Get an object from the cache
*
* @param key
* The key entered by the user.
* @param refreshPeriod
* How long the object can stay in cache in seconds. To allow the
* entry to stay in the cache indefinitely, supply a value of
* {@link CacheEntry#INDEFINITE_EXPIRY}
* @param cronExpression
* A cron expression that the age of the cache entry will be
* compared to. If the entry is older than the most recent match
* for the cron expression, the entry will be considered stale.
* @return The object from cache
* @throws NeedsRefreshException
* when no cache entry could be found with the supplied key, or
* when an entry was found but is considered out of date. If the
* cache entry is a new entry that is currently being
* constructed this method will block until the new entry
* becomes available. Similarly, it will block if a stale entry
* is currently being rebuilt by another thread and cache
* blocking is enabled (<code>cache.blocking=true</code>).
*/
public Object getFromCache(String key, int refreshPeriod, String cronExpression)

throws NeedsRefreshException {
return getCache().getFromCache(key, refreshPeriod, cronExpression);
}
還是來到了Cache的#getFromCache():

public Object getFromCache(String key, int refreshPeriod, String cronExpiry) throws NeedsRefreshException {
// 先嘗試在緩存中查找
CacheEntry cacheEntry = this.getCacheEntry(key, null, null);

// 實際緩存的對象
Object content = cacheEntry.getContent();

// 緩存訪問事件類型,默認是"緩存命中"
CacheMapAccessEventType accessEventType = CacheMapAccessEventType.HIT;

boolean reload = false;

// 判斷緩存是否過期

if (this.isStale(cacheEntry, refreshPeriod, cronExpiry)) {
// 得到緩存更新狀態(tài)
EntryUpdateState updateState = getUpdateState(key);

try {

synchronized (updateState) {

if (updateState.isAwaitingUpdate() || updateState.isCancelled()) {
// 如果是等待更新或者取消更新,那么將更新狀態(tài)設(shè)置成UPDATE_IN_PROGRESS
updateState.startUpdate();


if (cacheEntry.isNew()) {// 如果緩存中沒有,那么設(shè)成"沒命中"
accessEventType = CacheMapAccessEventType.MISS;

} else {// 設(shè)成"命中過期數(shù)據(jù)",這個后邊要拋異常的
accessEventType = CacheMapAccessEventType.STALE_HIT;
}

} else if (updateState.isUpdating()) {
// 如果其他線程正在更新緩存中,那么wait,等到更新完成后notify

if (cacheEntry.isNew() || blocking) {

do {

try {
updateState.wait();

} catch (InterruptedException e) {
}
} while (updateState.isUpdating());


if (updateState.isCancelled()) {
// 如果更新的線程取消了更新,那么緩存仍然是過期的,執(zhí)行和第一個分支相同的操作
updateState.startUpdate();


if (cacheEntry.isNew()) {
accessEventType = CacheMapAccessEventType.MISS;

} else {
accessEventType = CacheMapAccessEventType.STALE_HIT;
}

} else if (updateState.isComplete()) {
reload = true;

} else {
log.error("Invalid update state for cache entry " + key);
}
}

} else {
reload = true;
}
}

} finally {
// 線程引用數(shù)減少1
releaseUpdateState(updateState, key);
}
}


if (reload) {
// 可以重新載入緩存中的數(shù)據(jù)了
cacheEntry = (CacheEntry) cacheMap.get(key);


if (cacheEntry != null) {
content = cacheEntry.getContent();

} else {
log.error("Could not reload cache entry after waiting for it to be rebuilt");
}
}

// 發(fā)送緩存事件
dispatchCacheMapAccessEvent(accessEventType, cacheEntry, null);

// 如果數(shù)據(jù)過期,拋出需要刷新數(shù)據(jù)的異常

if (accessEventType != CacheMapAccessEventType.HIT) {
throw new NeedsRefreshException(content);
}

return content;
}
整個流程還是很好理解的,這里看一些細節(jié),首先是#isStale()方法:

protected boolean isStale(CacheEntry cacheEntry, int refreshPeriod, String cronExpiry) {
// 判斷是否需要刷新
boolean result = cacheEntry.needsRefresh(refreshPeriod) || isFlushed(cacheEntry);


if ((!result) && (cronExpiry != null) && (cronExpiry.length() > 0)) {

try {
// 利用計劃任務(wù)的方式處理過期
FastCronParser parser = new FastCronParser(cronExpiry);
result = result || parser.hasMoreRecentMatch(cacheEntry.getLastUpdate());

} catch (ParseException e) {
log.warn(e);
}
}

return result;
}
其次是獲取更新狀態(tài)的方法:

protected EntryUpdateState getUpdateState(String key) {
EntryUpdateState updateState;


synchronized (updateStates) {
// Try to find the matching state object in the updating entry map.
updateState = (EntryUpdateState) updateStates.get(key);


if (updateState == null) {
// It's not there so add it.
updateState = new EntryUpdateState();
updateStates.put(key, updateState);

} else {
// Otherwise indicate that we start using it to prevent its
// removal until all threads are done with it.
updateState.incrementUsageCounter();
}
}

return updateState;
}
另外就是EntryUpdateState的startUpdate()方法,它會把過期緩存的狀態(tài)設(shè)為UPDATE_IN_PROGRESS,這樣更新緩存的線程就會執(zhí)行更新操作了。

public int startUpdate() {

if ((state != NOT_YET_UPDATING) && (state != UPDATE_CANCELLED)) {
throw new IllegalStateException("Cannot begin cache update - current state (" + state
+ ") is not NOT_YET_UPDATING or UPDATE_CANCELLED");
}

state = UPDATE_IN_PROGRESS;
return incrementUsageCounter();
}
整個流程還是很好理解的,首先判斷緩存是否過期,如果過期,那么會從更新狀態(tài)緩存中查找更新狀態(tài),如果沒查到那么創(chuàng)建一個,如果已經(jīng)存在了,將引用計數(shù)加1。
如果緩存還沒有進行更新,那么將更新狀態(tài)設(shè)置為UPDATE_IN_PROGRESS,并且再次將引用計數(shù)加1,此時:
(1) 在設(shè)定完更新狀態(tài)后,讀取緩存的線程會將引用計數(shù)減1,如果沒有線程此時再引用了那么在更新狀態(tài)緩存中移除此項。
(2) 如果其他線程在更新緩存的時候會將引用計數(shù)減1,如果沒有線程此時再引用了那么在更新狀態(tài)緩存中移除此項。
這樣,通過上邊的兩個過程,就將引用計數(shù)變?yōu)?了。
想法其實是非常好的,不過我在實際的測試中發(fā)現(xiàn)有時候其實并不是完全按照這個流程走下去的(當然很有可能是當初設(shè)計就是如此,允許發(fā)生報出這種異常):
線程1調(diào)用getFromCache()并發(fā)現(xiàn)緩存過期,更新狀態(tài)為UPDATE_IN_PROGRESS后還沒有調(diào)用#releaseUpdateState(),線程2執(zhí)行#putInCache(),線程3執(zhí)行#putInCache()。線程2更新完狀態(tài)為UPDATE_COMPLETE后將引用計數(shù)減1,但是此時由于線程1還有一個引用計數(shù),所以線程3更新狀態(tài)時會在#completeUpdate()中拋出異常,因為此時的狀態(tài)是UPDATE_COMPLETE。
posted on 2010-12-15 08:45
臭美 閱讀(2075)
評論(0) 編輯 收藏 所屬分類:
Cache