現(xiàn)在開始要介紹的從緩存中讀取數(shù)據(jù)的過(guò)程,還是在GeneralCacheAdministrator#getFromCache(),這里有3個(gè)同名方法,還是找一個(gè)參數(shù)最多的:

/** *//**
* Get an object from the cache
*
* @param key
* The key entered by the user.
* @param refreshPeriod
* How long the object can stay in cache in seconds. To allow the
* entry to stay in the cache indefinitely, supply a value of
* {@link CacheEntry#INDEFINITE_EXPIRY}
* @param cronExpression
* A cron expression that the age of the cache entry will be
* compared to. If the entry is older than the most recent match
* for the cron expression, the entry will be considered stale.
* @return The object from cache
* @throws NeedsRefreshException
* when no cache entry could be found with the supplied key, or
* when an entry was found but is considered out of date. If the
* cache entry is a new entry that is currently being
* constructed this method will block until the new entry
* becomes available. Similarly, it will block if a stale entry
* is currently being rebuilt by another thread and cache
* blocking is enabled (<code>cache.blocking=true</code>).
*/
public Object getFromCache(String key, int refreshPeriod, String cronExpression)

throws NeedsRefreshException {
return getCache().getFromCache(key, refreshPeriod, cronExpression);
}
還是來(lái)到了Cache的#getFromCache():

public Object getFromCache(String key, int refreshPeriod, String cronExpiry) throws NeedsRefreshException {
// 先嘗試在緩存中查找
CacheEntry cacheEntry = this.getCacheEntry(key, null, null);

// 實(shí)際緩存的對(duì)象
Object content = cacheEntry.getContent();

// 緩存訪問(wèn)事件類型,默認(rèn)是"緩存命中"
CacheMapAccessEventType accessEventType = CacheMapAccessEventType.HIT;

boolean reload = false;

// 判斷緩存是否過(guò)期

if (this.isStale(cacheEntry, refreshPeriod, cronExpiry)) {
// 得到緩存更新狀態(tài)
EntryUpdateState updateState = getUpdateState(key);

try {

synchronized (updateState) {

if (updateState.isAwaitingUpdate() || updateState.isCancelled()) {
// 如果是等待更新或者取消更新,那么將更新狀態(tài)設(shè)置成UPDATE_IN_PROGRESS
updateState.startUpdate();


if (cacheEntry.isNew()) {// 如果緩存中沒(méi)有,那么設(shè)成"沒(méi)命中"
accessEventType = CacheMapAccessEventType.MISS;

} else {// 設(shè)成"命中過(guò)期數(shù)據(jù)",這個(gè)后邊要拋異常的
accessEventType = CacheMapAccessEventType.STALE_HIT;
}

} else if (updateState.isUpdating()) {
// 如果其他線程正在更新緩存中,那么wait,等到更新完成后notify

if (cacheEntry.isNew() || blocking) {

do {

try {
updateState.wait();

} catch (InterruptedException e) {
}
} while (updateState.isUpdating());


if (updateState.isCancelled()) {
// 如果更新的線程取消了更新,那么緩存仍然是過(guò)期的,執(zhí)行和第一個(gè)分支相同的操作
updateState.startUpdate();


if (cacheEntry.isNew()) {
accessEventType = CacheMapAccessEventType.MISS;

} else {
accessEventType = CacheMapAccessEventType.STALE_HIT;
}

} else if (updateState.isComplete()) {
reload = true;

} else {
log.error("Invalid update state for cache entry " + key);
}
}

} else {
reload = true;
}
}

} finally {
// 線程引用數(shù)減少1
releaseUpdateState(updateState, key);
}
}


if (reload) {
// 可以重新載入緩存中的數(shù)據(jù)了
cacheEntry = (CacheEntry) cacheMap.get(key);


if (cacheEntry != null) {
content = cacheEntry.getContent();

} else {
log.error("Could not reload cache entry after waiting for it to be rebuilt");
}
}

// 發(fā)送緩存事件
dispatchCacheMapAccessEvent(accessEventType, cacheEntry, null);

// 如果數(shù)據(jù)過(guò)期,拋出需要刷新數(shù)據(jù)的異常

if (accessEventType != CacheMapAccessEventType.HIT) {
throw new NeedsRefreshException(content);
}

return content;
}
整個(gè)流程還是很好理解的,這里看一些細(xì)節(jié),首先是#isStale()方法:

protected boolean isStale(CacheEntry cacheEntry, int refreshPeriod, String cronExpiry) {
// 判斷是否需要刷新
boolean result = cacheEntry.needsRefresh(refreshPeriod) || isFlushed(cacheEntry);


if ((!result) && (cronExpiry != null) && (cronExpiry.length() > 0)) {

try {
// 利用計(jì)劃任務(wù)的方式處理過(guò)期
FastCronParser parser = new FastCronParser(cronExpiry);
result = result || parser.hasMoreRecentMatch(cacheEntry.getLastUpdate());

} catch (ParseException e) {
log.warn(e);
}
}

return result;
}
其次是獲取更新狀態(tài)的方法:

protected EntryUpdateState getUpdateState(String key) {
EntryUpdateState updateState;


synchronized (updateStates) {
// Try to find the matching state object in the updating entry map.
updateState = (EntryUpdateState) updateStates.get(key);


if (updateState == null) {
// It's not there so add it.
updateState = new EntryUpdateState();
updateStates.put(key, updateState);

} else {
// Otherwise indicate that we start using it to prevent its
// removal until all threads are done with it.
updateState.incrementUsageCounter();
}
}

return updateState;
}
另外就是EntryUpdateState的startUpdate()方法,它會(huì)把過(guò)期緩存的狀態(tài)設(shè)為UPDATE_IN_PROGRESS,這樣更新緩存的線程就會(huì)執(zhí)行更新操作了。

public int startUpdate() {

if ((state != NOT_YET_UPDATING) && (state != UPDATE_CANCELLED)) {
throw new IllegalStateException("Cannot begin cache update - current state (" + state
+ ") is not NOT_YET_UPDATING or UPDATE_CANCELLED");
}

state = UPDATE_IN_PROGRESS;
return incrementUsageCounter();
}
整個(gè)流程還是很好理解的,首先判斷緩存是否過(guò)期,如果過(guò)期,那么會(huì)從更新狀態(tài)緩存中查找更新狀態(tài),如果沒(méi)查到那么創(chuàng)建一個(gè),如果已經(jīng)存在了,將引用計(jì)數(shù)加1。
如果緩存還沒(méi)有進(jìn)行更新,那么將更新狀態(tài)設(shè)置為UPDATE_IN_PROGRESS,并且再次將引用計(jì)數(shù)加1,此時(shí):
(1) 在設(shè)定完更新狀態(tài)后,讀取緩存的線程會(huì)將引用計(jì)數(shù)減1,如果沒(méi)有線程此時(shí)再引用了那么在更新狀態(tài)緩存中移除此項(xiàng)。
(2) 如果其他線程在更新緩存的時(shí)候會(huì)將引用計(jì)數(shù)減1,如果沒(méi)有線程此時(shí)再引用了那么在更新狀態(tài)緩存中移除此項(xiàng)。
這樣,通過(guò)上邊的兩個(gè)過(guò)程,就將引用計(jì)數(shù)變?yōu)?了。
想法其實(shí)是非常好的,不過(guò)我在實(shí)際的測(cè)試中發(fā)現(xiàn)有時(shí)候其實(shí)并不是完全按照這個(gè)流程走下去的(當(dāng)然很有可能是當(dāng)初設(shè)計(jì)就是如此,允許發(fā)生報(bào)出這種異常):
線程1調(diào)用getFromCache()并發(fā)現(xiàn)緩存過(guò)期,更新狀態(tài)為UPDATE_IN_PROGRESS后還沒(méi)有調(diào)用#releaseUpdateState(),線程2執(zhí)行#putInCache(),線程3執(zhí)行#putInCache()。線程2更新完?duì)顟B(tài)為UPDATE_COMPLETE后將引用計(jì)數(shù)減1,但是此時(shí)由于線程1還有一個(gè)引用計(jì)數(shù),所以線程3更新狀態(tài)時(shí)會(huì)在#completeUpdate()中拋出異常,因?yàn)榇藭r(shí)的狀態(tài)是UPDATE_COMPLETE。
posted on 2010-12-15 08:45
臭美 閱讀(2075)
評(píng)論(0) 編輯 收藏 所屬分類:
Cache