我對于Memcached的接觸,還是在去年看了CSDN的一系列國外大型網(wǎng)站架構(gòu)設(shè)計而開始的。最初的時候只是簡單的封裝了Memcached Java版的客戶端,主要是對于配置的簡化以及Memcached多點備份作了一些工作,然后就作為ASF的組件一部分提供給其他Team使用。其實看過Memcached Java客戶端代碼的人就會了解其實客戶端的事情很簡單,就是要有一套高性能的Socket通信框架以及對Memcached的私有協(xié)議實現(xiàn)的接口,自己去做這些事情也是很簡單的,不過既然有可以滿足自己需求的開源部分,那么就去實現(xiàn)自己需要的但沒有實現(xiàn)的。這里我用的是Whalin的客戶端版本,這里為什么還要提出來講這個,后面會提到。
在對Java客戶端作了簡單封裝和擴展以后,由于其他Team使用的沒有什么特殊需求,也就沒有再去做太多的修改,直到最近自己的服務集成平臺需要做服務訪問控制,才重新豐富了Cache組件,也就是這個過程中對于Memcached的一些特性和小的細節(jié)有了一些新的認識。
作為服務集成平臺需要對服務有所監(jiān)控,包括訪問頻率控制以及訪問次數(shù)控制。頻率控制其實很類似于硬件方面的頻率控制,例如硬件可以對IP的高頻率訪問視為攻擊,列入黑名單。而作為服務的訪問,對于服務訪問者的控制其實涉及到了業(yè)務參數(shù),那么硬件就不是很適合去做這方面的控制,為此我也考慮了很久,最開始打算在Apache上做一個模塊控制,但是最后覺得還是放在后面的業(yè)務框架上做這件事情。當然后面我說說的方案可能并不好,但是也算是一種想法。要把頻繁的訪問數(shù)據(jù)記錄下來同時分析,那么數(shù)據(jù)庫肯定是不行的,最簡單的方式就是采用Cache,又因為是集群范圍內(nèi)的控制,那么集中式Cache就非Memcached莫數(shù)了(分布式的Cache傳播本身損耗太大,集中式Cache本來的最大缺點就是單點,但作簡單的備份操作就可以基本解決此類問題)。
作為解決這個問題的方法來說只需要實現(xiàn)兩部分工作:訪問計數(shù)器,定時任務。定時任務在我做日志分析框架的時候都是采用了Jdk5的Concurrent包里面的ScheduledExecutorService,這個作簡單的循環(huán)任務足夠用了,同時也是有很好的多線程異步支持,復雜一點么用Quartz。計數(shù)器就要靠Memcached來實現(xiàn)了,本來一般的Cache最大的問題就是高并發(fā)下的事務保證,如果采用Get+Set來完成計數(shù)的話,那么高并發(fā)下計數(shù)器就會出現(xiàn)讀寫不一致性的問題,幸好Memcached提供了計數(shù)累加功能,讓這種累加動作能夠在服務端一次做好,服務端控制并發(fā)寫入,保證數(shù)據(jù)的一致性。
下面就看看以下幾個方法:
boolean storeCounter(String key, long count):存儲key的計數(shù)器,值為count。
long getCounter(String key):獲取key的計數(shù)器,如果不存在返回-1。
long addOrDecr(String key, long decr):計數(shù)器值減去decr,如果計數(shù)器不存在,保存decr作為計數(shù)器值
long addOrIncr(String key, long inc):計數(shù)器值增加inc,如果計數(shù)器不存在,保存inc作為計數(shù)器值
long decr(String key, long decr):與addOrDecr不同的是在計數(shù)器不存在的時候不保存任何值,返回-1
long incr(String key, long inc) :與addOrIncr不同的是在計數(shù)器不存在的時候不保存任何值,返回-1
這里需要說明幾點:
storeCounter和普通的set方法不同,如果通過set方式置入key:value的話,getCounter等其他四個方法都認為技術(shù)器不存在。所以Counter的存儲方式是和普通內(nèi)容存儲不同的。
在不同的場景要慎用addOrXXXX和XXXX的方法,兩者還是有比較大的區(qū)別的。
計數(shù)器沒有提供移除特殊方法,使用delete方法可以移除計數(shù)器,但是頻繁的delete和addOrXXXX有時候會出現(xiàn)一些奇怪的問題(例如同名的計數(shù)器就沒有辦法再次被創(chuàng)建,不過這個還需要進一步的去研究一下看看)。一般情況下如果計數(shù)器的key不是很多,同時也會被復用,那么可以通過置為0或者減去已經(jīng)分析過的數(shù)量來復位。
有上面的一套計數(shù)器機制就可以很方便的實現(xiàn)Memcached的計數(shù)功能,但是又一個問題出現(xiàn)了,如何讓定時任務去遍歷計數(shù)器,分析計數(shù)器是否到了閥值,觸發(fā)創(chuàng)建黑名單記錄的工作。早先我同事希望我能夠提供封裝好的keySet接口,但是我自己覺得其實作為Cache來說簡單就是最重要的,Cache不需要去遍歷。首先使用Cache的角色就應該知道Key,然后去Cache里面找,找不到就去后臺例如DB里面去搜索,然后將搜索的結(jié)果在考慮更新到Cache里面,這樣才是最高效并且最可靠的,Cache靠不住阿,隨時都可能會丟失或者崩潰,因此作為類似于一級緩存或者這類數(shù)據(jù)完整性要求不高,性能要求很高的場景使用最合適。當時就沒有提供這樣的接口,直到今天自己需要了,才考慮如何去做這件事情。
開始考慮是否能夠?qū)?/span>key都記錄在另外的Cache中或者是Memcached中,首先在高并發(fā)下更新操作就是一大問題,再者Memcached的內(nèi)存分配回收機制以及Value的大小限制都不能滿足這樣的需求,如果使用數(shù)據(jù)庫,那么頻繁更新操作勢必不可行,采用異步緩存刷新又有一個時間間隔期,同時更新也不是很方便。最后考慮如果能夠讓Memcached實現(xiàn)Keyset那么就是最好的解決方案,網(wǎng)上搜索了一下,找到一種策略,然后自己優(yōu)化了一下,優(yōu)化后的代碼如下:
@SuppressWarnings("unchecked")
public Set keySet(int limit,boolean fast)
{
Set<String> keys = new HashSet<String>();
Map<String,Integer> dumps = new HashMap<String,Integer>();
Map slabs = getCacheClient().statsItems();
if (slabs != null && slabs.keySet() != null)
{
Iterator itemsItr = slabs.keySet().iterator();
while(itemsItr.hasNext())
{
String server = itemsItr.next().toString();
Map itemNames = (Map) slabs.get(server);
Iterator itemNameItr = itemNames.keySet().iterator();
while(itemNameItr.hasNext())
{
String itemName = itemNameItr.next().toString();
// itemAtt[0] = itemname
// itemAtt[1] = number
// itemAtt[2] = field
String[] itemAtt = itemName.split(":");
if (itemAtt[2].startsWith("number"))
dumps.put(itemAtt[1], Integer.parseInt(itemAtt[1]));
}
}
if (!dumps.values().isEmpty())
{
Iterator<Integer> dumpIter = dumps.values().iterator();
while(dumpIter.hasNext())
{
int dump = dumpIter.next();
Map cacheDump = statsCacheDump(dump,limit);
Iterator entryIter = cacheDump.values().iterator();
while (entryIter.hasNext())
{
Map items = (Map)entryIter.next();
Iterator ks = items.keySet().iterator();
while(ks.hasNext())
{
String k = (String)ks.next();
try
{
k = URLDecoder.decode(k,"UTF-8");
}
catch(Exception ex)
{
Logger.error(ex);
}
if (k != null && !k.trim().equals(""))
{
if (fast)
keys.add(k);
else
if (containsKey(k))
keys.add(k);
}
}
}
}
}
}
return keys;
}
對于上面代碼的了解需要從Memcached內(nèi)存分配和回收機制開始,以前接觸Memcached的時候只是了解,這部分代碼寫了以后就有些知道怎么回事了。Memcached為了提高內(nèi)存的分配和回收效率,采用了slab和dump分區(qū)的概念。Memcached一大優(yōu)勢就是能夠充分利用Memory資源,將同機器或者不同機器的Memcached服務端組合成為對客戶端看似統(tǒng)一的存儲空間,Memcached可以在一臺機器上開多個端口作為服務端多個實例,也可以在多臺機器上開多個服務實例,而slab就是Memcached的服務端。下面是我封裝后的Cache配置:
<?xml version="1.0" encoding="UTF-8"?>
<memcached>
<client name="mclient0" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool0">
<!--errorHandler></errorHandler-->
</client>
<client name="mclient1" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool1">
<!--errorHandler></errorHandler-->
</client>
<client name="mclient11" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool11">
<!--errorHandler></errorHandler-->
</client>
<socketpool name="pool0" failover="true" initConn="10" minConn="5" maxConn="250" maintSleep="0"
nagle="false" socketTO="3000" aliveCheck="true">
<servers>10.2.225.210:13000,10.2.225.210:13001,10.2.225.210:13002</servers>
</socketpool>
<socketpool name="pool1" failover="true" initConn="10" minConn="5" maxConn="250" maintSleep="0"
nagle="false" socketTO="3000" aliveCheck="true">
<servers>10.2.225.210:13000</servers>
</socketpool>
<socketpool name="pool11" failover="true" initConn="10" minConn="5" maxConn="250" maintSleep="0"
nagle="false" socketTO="3000" aliveCheck="true">
<servers>10.2.225.210:13000</servers>
</socketpool>
<cluster name="cluster1">
<memCachedClients>mclient1,mclient11</memCachedClients>
</cluster>
</memcached>
可以看到其實pool才是最終連接服務端的配置,看看pool0,它會連接10.2.225.210:13000,10.2.225.210:13001,10.2.225.210:13002這些機器和他們的端口,但是對于使用pool0的mclient0來說它僅僅只是知道有一個叫做mclient0的cache可以保存數(shù)據(jù)。此時slab就有三個:10.2.225.210:13000和10.2.225.210:13001和10.2.225.210:13002。
當一個key:value要被放入到Memcached中,首先Memcached會根據(jù)key的hash算法獲取到hash值來選擇被分配的slab,然后根據(jù)value選擇適合的dump區(qū)。所謂dump區(qū)其實就是根據(jù)value的大小來將內(nèi)存按照存儲單元內(nèi)容大小分頁。這個是可以配置Memcached的,例如Memcached將slab中的內(nèi)存劃分成4個dump,第一dump區(qū)存儲0-50k大小的數(shù)據(jù),第二dump區(qū)存儲50-100k的數(shù)據(jù),第三dump區(qū)存儲100-500k的數(shù)據(jù),第四dump區(qū)存儲500-1000K的數(shù)據(jù)。那么當key:value需要被寫入的時候,很容易定位到value所處的dump,分配內(nèi)存給value。這種分dump模式簡化內(nèi)存管理,加速了內(nèi)存回收和分配。但是這里需要注意的幾點就是,首先當你的應用場景中保存的數(shù)據(jù)大小離散度很高,那么就不是很適合Memcached的這種分配模式,容易造成浪費,例如第一dump區(qū)已經(jīng)滿了,第二第三dump區(qū)都還是只有一個數(shù)據(jù),那么第二第三dump區(qū)不會被回收,第二第三dump區(qū)的空間就浪費了。同時Memcached對于value的大小支持到1M,大于1M的內(nèi)容不適合Memcached存儲。其實在Cache的設(shè)計中這樣的情況發(fā)生本來就證明設(shè)計有問題,Cache只是加速,一般保存都是較小的id或者小對象,用來驗證以及為數(shù)據(jù)定位作精準細化,而大數(shù)據(jù)量的內(nèi)容還是在數(shù)據(jù)庫等存儲中。
知道了基本的分配機制以后再回過頭來看看代碼:
Map slabs = getCacheClient().statsItems();//獲取所有的slab
//用來收集所有slab的dump號
while(itemsItr.hasNext())
{
String server = itemsItr.next().toString();
Map itemNames = (Map) slabs.get(server);
Iterator itemNameItr = itemNames.keySet().iterator();
while(itemNameItr.hasNext())
{
String itemName = itemNameItr.next().toString();
// itemAtt[0] = itemname
// itemAtt[1] = number
// itemAtt[2] = field
String[] itemAtt = itemName.split(":");
// 如果是itemName中是:number來表示,那么證明是一個存儲數(shù)據(jù)的dump,還有一些是age的部分
if (itemAtt[2].startsWith("number"))
dumps.put(itemAtt[1], Integer.parseInt(itemAtt[1]));
}
}
//根據(jù)收集到的dump來獲取keys
if (!dumps.values().isEmpty())
{
Iterator<Integer> dumpIter = dumps.values().iterator();
while(dumpIter.hasNext())
{
int dump = dumpIter.next();
// statsCacheDump支持三個參數(shù)String[],int,int,第一個參數(shù)可以省略,默認填入null,表示從那些slab中獲取dump號為第二個參數(shù)的keys,如果是null就從當前所有的slab中獲取。第二個參數(shù)表示dump號,第三個參數(shù)表示返回最多多少個結(jié)果。
Map cacheDump = statsCacheDump(dump,limit);
Iterator entryIter = cacheDump.values().iterator();
while (entryIter.hasNext())
{
Map items = (Map)entryIter.next();
Iterator ks = items.keySet().iterator();
while(ks.hasNext())
{
String k = (String)ks.next();
try
{
//這里為什么要作decode,因為其實在我使用的這個java客戶端存儲的時候,默認會把key都作encoding一次,所以必須要做,不然會出現(xiàn)問題。
k = URLDecoder.decode(k,"UTF-8");
}
catch(Exception ex)
{
Logger.error(ex);
}
if (k != null && !k.trim().equals(""))
{
//這里的fast參數(shù)是在方法參數(shù)中傳入,作用是什么,其實采用這種搜索slab以及dump的方式獲取keys會發(fā)現(xiàn)返回的可能還有一些已經(jīng)移除的內(nèi)容的keys,如果覺得需要準確的keys,就在做一次contains的檢查,不過速度就會有一定的影響。
if (fast)
keys.add(k);
else
if (containsKey(k))
keys.add(k);
}
}
}
}
}
至此,整個keySet的問題解決了,對于即時監(jiān)控也基本都作好了,這里需要把過程中的兩件小事情說一下。
1. statsCacheDump始終不能用。
剛開始的時候statsCacheDump方法始終報錯說連接超時,跟蹤到了java客戶端代碼中發(fā)現(xiàn)并不是什么連接超時,只是服務端返回了錯誤信息,而客戶端認為還沒有結(jié)束一直等待,導致超時。我就順手給java客戶端的開發(fā)人員mail了信息求助(代碼里面有email)。再仔細看了看出錯信息,返回的是不認識該指令的錯誤,因此就去解壓memcached的服務端,看了看它的協(xié)議說明,這個Stat方法還是有的,很奇怪,沒有辦法了,雖然自己對于c不是很懂,但起碼大致看懂邏輯還是不難,下載了Memcached的源碼一看,發(fā)現(xiàn)居然對于StatsCacheDump這個方法調(diào)用必須還有一個參數(shù)limit,在我手頭的客戶端代碼里面就沒有這個參數(shù),所以錯誤了,本來想擴展一下那個方法,但是那個方法中實現(xiàn)的不是很好,都是private的不容易擴展,這時候居然收到其中一個客戶端開發(fā)者的回復郵件,說我手頭的代碼太老了,同時不建議去實現(xiàn)keyset,認為這樣比較低效。我去下載了一個新版本,看了看源碼果然已經(jīng)修復了,我就回了郵件表示感謝,同時也和他說明了這么做的原因。因此大家如果要和我一樣寫上面的代碼,就需要它2.0.1的那個版本。這里對那些國外的開源工作者表示敬佩,對于開發(fā)者是很負責任的。
2.關(guān)于fast那個選項
這個是我加上去的,做了一下測試,例如我先執(zhí)行如下代碼:
Cache.set(“key1”,”value1”);
Cache.set(“key2”,”value2”);
Cache.flushAll(null);
Cache.set(“key3”,”value3”);
Cache.set(“key4”,”value4”);
Boolean fast = true;
Set keys = Cache.keySet(fast);
System.out.println(keys);
Fast = false;
keys = Cache.keySet(fast);
System.out.println(keys);
得到的結(jié)果為:
Key1,key2,key3,key4
Key3,key4
可以看到其實如果通過StatsCacheDump來獲取得到的keys會參雜一些已經(jīng)失效的keys,只是沒有回收,本來嘗試獲取時間戳來做判斷,不過還不如使用containsKey來的有效。
同時這里采用containsKey而不是用get,就是因為counter是不能用get獲得的,即使counter存在。
這些就是今天在使用Memcached所收獲的,分享一下,如果有一些理解上的偏差也希望能夠被指出。