其實之前我完全沒有接觸過oscache,今天突發奇想,準備看看緩存是怎么實現的,google了一下,決定看看oscache的源碼,簡單的寫了個TestCase:
@Test

public void testPojoCache() throws Exception {
TestPojo pojo = new TestPojo("0001");
pojo.setField1(100);
pojo.setField2("100");

Properties prop = new Properties();
InputStream is = this.getClass().getResourceAsStream("oscache.properties");
prop.load(is);

is.close();

GeneralCacheAdministrator cacheAdmin = new GeneralCacheAdministrator(prop);

cacheAdmin.putInCache(pojo.getId(), pojo);

TestPojo cachedObj = (TestPojo) cacheAdmin.getFromCache("0001");

assertEquals(100, cachedObj.getField1());
assertEquals("100", cachedObj.getField2());

}
所以我對這個產品的熟悉程度基本為0,各位大蝦看了后發現誤人子弟請用磚輕拍。
簡單了看了下oscache的介紹,發現它支持三種緩存方式:JSP Caching、Request Caching、General-Purpose Cache,今天的文章就是針對最后一種General-Purpose Cache的,在oscache中對應的入口是GeneralCacheAdministrator,就從這里開始吧:

public GeneralCacheAdministrator(Properties p) {
super(p);
log.info("Constructed GeneralCacheAdministrator()");
createCache();
}

// 這個是super(p)調用的構造函數

protected AbstractCacheAdministrator(Properties p) {
loadProps(p);
initCacheParameters();


if (log.isDebugEnabled()) {
log.debug("Constructed AbstractCacheAdministrator()");
}
}

// 初始化參數

private void initCacheParameters() {
algorithmClass = getProperty(CACHE_ALGORITHM_KEY);

blocking = "true".equalsIgnoreCase(getProperty(CACHE_BLOCKING_KEY));

String cacheMemoryStr = getProperty(CACHE_MEMORY_KEY);


if ((cacheMemoryStr != null) && cacheMemoryStr.equalsIgnoreCase("false")) {
memoryCaching = false;
}

unlimitedDiskCache = Boolean.valueOf(config.getProperty(CACHE_DISK_UNLIMITED_KEY)).booleanValue();
overflowPersistence = Boolean.valueOf(config.getProperty(CACHE_PERSISTENCE_OVERFLOW_KEY))
.booleanValue();

String cacheSize = getProperty(CACHE_CAPACITY_KEY);


try {

if ((cacheSize != null) && (cacheSize.length() > 0)) {
cacheCapacity = Integer.parseInt(cacheSize);
}

} catch (NumberFormatException e) {
log.error("The value supplied for the cache capacity, '" + cacheSize
+ "', is not a valid number. The cache capacity setting is being ignored.");
}
}

// 創建緩存實例

private void createCache() {
log.info("Creating new cache");

// 這里構建緩存時用到的參數都是在父類里的#initCacheParameters()中初始化的
applicationCache = new Cache(isMemoryCaching(), isUnlimitedDiskCache(), isOverflowPersistence(),
isBlocking(), algorithmClass, cacheCapacity);

configureStandardListeners(applicationCache);
}

// Cache的構造函數
public Cache(boolean useMemoryCaching, boolean unlimitedDiskCache, boolean overflowPersistence,

boolean blocking, String algorithmClass, int capacity) {
// 這里說明還是支持自定義的緩存策略的

if (((algorithmClass != null) && (algorithmClass.length() > 0)) && (capacity > 0)) {

try {
cacheMap = (AbstractConcurrentReadCache) Class.forName(algorithmClass).newInstance();
cacheMap.setMaxEntries(capacity);

} catch (Exception e) {
log.error("Invalid class name for cache algorithm class. " + e.toString());
}
}


if (cacheMap == null) {
// 選擇一個默認的策略

if (capacity > 0) {// 如果有緩存數目的限制,使用LRU
cacheMap = new LRUCache(capacity);

} else {// 否則使用Unlimited
cacheMap = new UnlimitedCache();
}
}

cacheMap.setUnlimitedDiskCache(unlimitedDiskCache);
cacheMap.setOverflowPersistence(overflowPersistence);
cacheMap.setMemoryCaching(useMemoryCaching);

this.blocking = blocking;
}

// 配置事件的listener

protected Cache configureStandardListeners(Cache cache) {

if (config.getProperty(PERSISTENCE_CLASS_KEY) != null) {
cache = setPersistenceListener(cache);
}


if (config.getProperty(CACHE_ENTRY_EVENT_LISTENERS_KEY) != null) {
// Grab all the specified listeners and add them to the cache's
// listener list. Note that listeners that implement more than
// one of the event interfaces will be added multiple times.
CacheEventListener[] listeners = getCacheEventListeners();


for (int i = 0; i < listeners.length; i++) {
// Pass through the configuration to those listeners that
// require it

if (listeners[i] instanceof LifecycleAware) {

try {
((LifecycleAware) listeners[i]).initialize(cache, config);

} catch (InitializationException e) {
log.error("Could not initialize listener '" + listeners[i].getClass().getName()
+ "'. Listener ignored.", e);

continue;
}
}


if (listeners[i] instanceof CacheEntryEventListener) {
cache.addCacheEventListener(listeners[i]);

} else if (listeners[i] instanceof CacheMapAccessEventListener) {
cache.addCacheEventListener(listeners[i]);
}
}
}

return cache;
}
這里對緩存#initCacheParameters()和#configureStandardListeners()里的參數大致了解下,代碼里有注釋良好的JavaDoc,很容易看懂。
1. cache.algorithm 緩存的策略,具體就是用哪種Map來實現緩存,默認有FIFO,LRU,Unlimited三種,也支持自定義的緩存類型的;
2. cache.blocking 是否等待新數據放入緩存,這個感覺應該是在并發讀取數據的時候如果數據在其他線程正處于寫入的狀態這個線程會wait()直到寫入線程完成寫入后notifyAll();
3. cache.memory 這個是是否使用內存緩存,這個多數情況下都應該是內存的吧;
4. cache.unlimited.disk 看介紹寫的是當對象需要持久化緩存(應該是串行化吧)時,是否使用無限的磁盤空間;
5. cache.persistence.overflow.only 這個說明持久化緩存是不是僅在溢出的方式下開啟,字面理解應該是否僅在是memory緩存不足的情況開啟持久化緩存;
6. cache.capacity 保存的對象的數目。
7. cache.persistence.class 用于持久化的類名
8. cache.event.listeners 緩存事件監聽器,多個listener使用逗號分隔開
初始化大致就這么多,還是看看GeneralCacheAdministrator#putInCache()方法吧,這里#putInCache()有4個,我挑了一個參數最多的:putInCache(String key, Object content, String[] groups, EntryRefreshPolicy policy)

/** *//**
* Puts an object in a cache
*
* @param key The unique key for this cached object
* @param content The object to store
* @param groups The groups that this object belongs to
* @param policy The refresh policy to use
*/

public void putInCache(String key, Object content, String[] groups, EntryRefreshPolicy policy) {
// 直接調用的Cache類的#putInCache
getCache().putInCache(key, content, groups, policy, null);
}

// Cache的#putInCache()方法
public void putInCache(String key, Object content, String[] groups, EntryRefreshPolicy policy,

String origin) {
// 首先查找這個key在緩存中是否已經存在,沒有就創建一個
CacheEntry cacheEntry = this.getCacheEntry(key, policy, origin);

// 判斷是否是新創建的緩存
boolean isNewEntry = cacheEntry.isNew();

// [CACHE-118] If we have an existing entry, create a new CacheEntry so
// we can still access the old one later
// 這里如果不是新的緩存也會新建一個CacheEntry,因為老的緩存值在后邊也能訪問到

if (!isNewEntry) {
cacheEntry = new CacheEntry(key, policy);
}

cacheEntry.setContent(content);
cacheEntry.setGroups(groups);

// 放入緩存
cacheMap.put(key, cacheEntry);

// Signal to any threads waiting on this update that it's now ready for them in the cache!
// 這里會通知其他在等待值的線程結束wait
completeUpdate(key);

// 針對緩存事件的listener發送事件

if (listenerList.getListenerCount() > 0) {
CacheEntryEvent event = new CacheEntryEvent(this, cacheEntry, origin);


if (isNewEntry) {
dispatchCacheEntryEvent(CacheEntryEventType.ENTRY_ADDED, event);

} else {
dispatchCacheEntryEvent(CacheEntryEventType.ENTRY_UPDATED, event);
}
}
}
先簡單的看看CacheEntry的構造函數吧,這個最基本:

public CacheEntry(String key, EntryRefreshPolicy policy, String[] groups) {
// CacheEntry中保存了key,所屬的分組(用一個HashSet保存分組的名字,有點像Tag)
// 還有刷新策略和創建的時間
this.key = key;


if (groups != null) {
this.groups = new HashSet(groups.length);


for (int i = 0; i < groups.length; i++) {
this.groups.add(groups[i]);
}
}

this.policy = policy;
this.created = System.currentTimeMillis();
}
另外還有#completeUpdate()方法:

protected void completeUpdate(String key) {
// Entry的更新狀態,有NOT_YET_UPDATING, UPDATE_IN_PROGRESS, UPDATE_COMPLETE,
// UPDATE_CANCELLED四種
EntryUpdateState state;

// Cache用一個updateStates(HashMap)來保存各個CacheEntry的更新狀態

synchronized (updateStates) {
state = (EntryUpdateState) updateStates.get(key);


if (state != null) {

synchronized (state) {
int usageCounter = state.completeUpdate();

// 通知其他線程結束等待
state.notifyAll();

// 從updateStates移除key
checkEntryStateUpdateUsage(key, state, usageCounter);
}

} else {
// If putInCache() was called directly (i.e. not as a result of
// a NeedRefreshException) then no EntryUpdateState would be
// found.
}
}
}

// EntryUpdateState的#completeUpdate()方法

public int completeUpdate() {
// 狀態不正確,在實際的測試中這個異常是可以報出來的

if (state != UPDATE_IN_PROGRESS) {
throw new IllegalStateException("Cannot complete cache update - current state (" + state
+ ") is not UPDATE_IN_PROGRESS");
}

state = UPDATE_COMPLETE;
return decrementUsageCounter();
}


private void checkEntryStateUpdateUsage(String key, EntryUpdateState state, int usageCounter) {
// Clean up the updateStates map to avoid a memory leak once no thread
// is using this EntryUpdateState instance anymore.
// 在這里,如果沒有了對這個key的"引用",那么就是updateStates中去掉它

if (usageCounter == 0) {
// 這里的remove操作感覺不光能像原來的注釋所說的避免內存泄漏,另外還能對多線程同時put,get有用處
EntryUpdateState removedState = (EntryUpdateState) updateStates.remove(key);

if (state != removedState) {

if (log.isErrorEnabled()) {

try {
throw new Exception("OSCache: internal error: removed state [" + removedState
+ "] from key [" + key + "] whereas we expected [" + state + "]");

} catch (Exception e) {
log.error(e);
}
}
}
}
}
這里要注意一個事情,其實在#getFromCache()是可以并發讀取緩存數據的,而寫入的時候只能一個線程寫入,另外在寫入的時候,更新實際的緩存和修改更新狀態是獨立開的,我在實際的測試中也發現了并發量很高的連續讀寫操作其實在#completeUpdate()方法中是會拋出異常的,不過在實際使用中這個情況應該發生的較少。
這里,整個存入緩存的大致流程就介紹完了,當處理多線程并發寫入讀取時,很多情況是要和#getFromCache()一起結合看的。
posted on 2010-12-14 08:37
臭美 閱讀(2401)
評論(0) 編輯 收藏 所屬分類:
Cache