CONAN ZONE

          你越掙扎我就越興奮

          BlogJava 首頁 新隨筆 聯系 聚合 管理
            0 Posts :: 282 Stories :: 0 Comments :: 0 Trackbacks
          我對于Memcached的接觸,還是在去年看了CSDN的一系列國外大型網站架構設計而開始的。最初的時候只是簡單的封裝了Memcached Java版的客戶端,主要是對于配置的簡化以及Memcached多點備份作了一些工作,然后就作為ASF的組件一部分提供給其他Team使用。其實看過Memcached Java客戶端代碼的人就會了解其實客戶端的事情很簡單,就是要有一套高性能的Socket通信框架以及對Memcached的私有協議實現的接口,自己去做這些事情也是很簡單的,不過既然有可以滿足自己需求的開源部分,那么就去實現自己需要的但沒有實現的。這里我用的是Whalin的客戶端版本,這里為什么還要提出來講這個,后面會提到。

                 在對Java客戶端作了簡單封裝和擴展以后,由于其他Team使用的沒有什么特殊需求,也就沒有再去做太多的修改,直到最近自己的服務集成平臺需要做服務訪問控制,才重新豐富了Cache組件,也就是這個過程中對于Memcached的一些特性和小的細節有了一些新的認識。

                 作為服務集成平臺需要對服務有所監控,包括訪問頻率控制以及訪問次數控制。頻率控制其實很類似于硬件方面的頻率控制,例如硬件可以對IP的高頻率訪問視為攻擊,列入黑名單。而作為服務的訪問,對于服務訪問者的控制其實涉及到了業務參數,那么硬件就不是很適合去做這方面的控制,為此我也考慮了很久,最開始打算在Apache上做一個模塊控制,但是最后覺得還是放在后面的業務框架上做這件事情。當然后面我說說的方案可能并不好,但是也算是一種想法。要把頻繁的訪問數據記錄下來同時分析,那么數據庫肯定是不行的,最簡單的方式就是采用Cache,又因為是集群范圍內的控制,那么集中式Cache就非Memcached莫數了(分布式的Cache傳播本身損耗太大,集中式Cache本來的最大缺點就是單點,但作簡單的備份操作就可以基本解決此類問題)。

                 作為解決這個問題的方法來說只需要實現兩部分工作:訪問計數器,定時任務。定時任務在我做日志分析框架的時候都是采用了Jdk5Concurrent包里面的ScheduledExecutorService,這個作簡單的循環任務足夠用了,同時也是有很好的多線程異步支持,復雜一點么用Quartz。計數器就要靠Memcached來實現了,本來一般的Cache最大的問題就是高并發下的事務保證,如果采用Get+Set來完成計數的話,那么高并發下計數器就會出現讀寫不一致性的問題,幸好Memcached提供了計數累加功能,讓這種累加動作能夠在服務端一次做好,服務端控制并發寫入,保證數據的一致性。

          下面就看看以下幾個方法:

          boolean storeCounter(String key, long count):存儲key的計數器,值為count

          long getCounter(String key):獲取key的計數器,如果不存在返回-1

          long addOrDecr(String key, long decr):計數器值減去decr,如果計數器不存在,保存decr作為計數器值

          long addOrIncr(String key, long inc):計數器值增加inc,如果計數器不存在,保存inc作為計數器值

          long decr(String key, long decr):與addOrDecr不同的是在計數器不存在的時候不保存任何值,返回-1

          long incr(String key, long inc) :與addOrIncr不同的是在計數器不存在的時候不保存任何值,返回-1

          這里需要說明幾點:

          storeCounter和普通的set方法不同,如果通過set方式置入key:value的話,getCounter等其他四個方法都認為技術器不存在。所以Counter的存儲方式是和普通內容存儲不同的。

          在不同的場景要慎用addOrXXXXXXXX的方法,兩者還是有比較大的區別的。

          計數器沒有提供移除特殊方法,使用delete方法可以移除計數器,但是頻繁的deleteaddOrXXXX有時候會出現一些奇怪的問題(例如同名的計數器就沒有辦法再次被創建,不過這個還需要進一步的去研究一下看看)。一般情況下如果計數器的key不是很多,同時也會被復用,那么可以通過置為0或者減去已經分析過的數量來復位。

                 有上面的一套計數器機制就可以很方便的實現Memcached的計數功能,但是又一個問題出現了,如何讓定時任務去遍歷計數器,分析計數器是否到了閥值,觸發創建黑名單記錄的工作。早先我同事希望我能夠提供封裝好的keySet接口,但是我自己覺得其實作為Cache來說簡單就是最重要的,Cache不需要去遍歷。首先使用Cache的角色就應該知道Key,然后去Cache里面找,找不到就去后臺例如DB里面去搜索,然后將搜索的結果在考慮更新到Cache里面,這樣才是最高效并且最可靠的,Cache靠不住阿,隨時都可能會丟失或者崩潰,因此作為類似于一級緩存或者這類數據完整性要求不高,性能要求很高的場景使用最合適。當時就沒有提供這樣的接口,直到今天自己需要了,才考慮如何去做這件事情。

                 開始考慮是否能夠將key都記錄在另外的Cache中或者是Memcached中,首先在高并發下更新操作就是一大問題,再者Memcached的內存分配回收機制以及Value的大小限制都不能滿足這樣的需求,如果使用數據庫,那么頻繁更新操作勢必不可行,采用異步緩存刷新又有一個時間間隔期,同時更新也不是很方便。最后考慮如果能夠讓Memcached實現Keyset那么就是最好的解決方案,網上搜索了一下,找到一種策略,然后自己優化了一下,優化后的代碼如下:

              @SuppressWarnings("unchecked")

              public Set keySet(int limit,boolean fast)

              {

                 Set<String> keys = new HashSet<String>();

                 Map<String,Integer> dumps = new HashMap<String,Integer>();

                      

                 Map slabs = getCacheClient().statsItems();

                

                 if (slabs != null && slabs.keySet() != null)

                 {

                     Iterator itemsItr = slabs.keySet().iterator();

                    

                     while(itemsItr.hasNext())

                     {

                        String server = itemsItr.next().toString();

                        Map itemNames = (Map) slabs.get(server);

                        Iterator itemNameItr = itemNames.keySet().iterator();

                       

                        while(itemNameItr.hasNext())

                        {

                            String itemName = itemNameItr.next().toString();

                           

                            // itemAtt[0] = itemname

                             // itemAtt[1] = number

                             // itemAtt[2] = field

                             String[] itemAtt = itemName.split(":");

                            

                             if (itemAtt[2].startsWith("number"))

                                 dumps.put(itemAtt[1], Integer.parseInt(itemAtt[1]));

                        }

                     }

                    

                     if (!dumps.values().isEmpty())

                     {

                        Iterator<Integer> dumpIter = dumps.values().iterator();

                       

                        while(dumpIter.hasNext())

                        {

                            int dump = dumpIter.next();

                           

                            Map cacheDump = statsCacheDump(dump,limit);

                           

                            Iterator entryIter = cacheDump.values().iterator();

                           

                            while (entryIter.hasNext())

                             {

                                 Map items = (Map)entryIter.next();

                                

                                 Iterator ks = items.keySet().iterator();

                                

                                 while(ks.hasNext())

                                 {

                                    String k = (String)ks.next();

                                   

                                    try

                                    {

                                        k = URLDecoder.decode(k,"UTF-8");

                                    }

                                    catch(Exception ex)

                                    {

                                        Logger.error(ex);

                                    }

                                    if (k != null && !k.trim().equals(""))

                                    {

                                        if (fast)

                                           keys.add(k);

                                        else

                                           if (containsKey(k))

                                               keys.add(k);

                                    }

                                 }

                             }

                           

                        }

                     }

                 }

                

                 return keys;

              }  

          對于上面代碼的了解需要從Memcached內存分配和回收機制開始,以前接觸Memcached的時候只是了解,這部分代碼寫了以后就有些知道怎么回事了。Memcached為了提高內存的分配和回收效率,采用了slabdump分區的概念。Memcached一大優勢就是能夠充分利用Memory資源,將同機器或者不同機器的Memcached服務端組合成為對客戶端看似統一的存儲空間,Memcached可以在一臺機器上開多個端口作為服務端多個實例,也可以在多臺機器上開多個服務實例,而slab就是Memcached的服務端。下面是我封裝后的Cache配置:

          <?xml version="1.0" encoding="UTF-8"?>

          <memcached>

             

              <client name="mclient0" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool0">

                  <!--errorHandler></errorHandler-->

              </client>

             

              <client name="mclient1" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool1">

                  <!--errorHandler></errorHandler-->

              </client>

             

              <client name="mclient11" compressEnable="true" defaultEncoding="UTF-8" socketpool="pool11">

                  <!--errorHandler></errorHandler-->

              </client>

             

              <socketpool name="pool0" failover="true" initConn="10" minConn="5" maxConn="250" maintSleep="0"

                  nagle="false" socketTO="3000" aliveCheck="true">

                  <servers>10.2.225.210:13000,10.2.225.210:13001,10.2.225.210:13002</servers>

              </socketpool> 

             

              <socketpool name="pool1" failover="true" initConn="10" minConn="5" maxConn="250" maintSleep="0"

                  nagle="false" socketTO="3000" aliveCheck="true">

                  <servers>10.2.225.210:13000</servers>

              </socketpool>  

              <socketpool name="pool11" failover="true" initConn="10" minConn="5" maxConn="250" maintSleep="0"

                  nagle="false" socketTO="3000" aliveCheck="true">

                  <servers>10.2.225.210:13000</servers>

              </socketpool>  

              <cluster name="cluster1">

                  <memCachedClients>mclient1,mclient11</memCachedClients>

              </cluster>

          </memcached>

          可以看到其實pool才是最終連接服務端的配置,看看pool0,它會連接10.2.225.210:13000,10.2.225.210:13001,10.2.225.210:13002這些機器和他們的端口,但是對于使用pool0mclient0來說它僅僅只是知道有一個叫做mclient0cache可以保存數據。此時slab就有三個:10.2.225.210:1300010.2.225.210:1300110.2.225.210:13002

          當一個key:value要被放入到Memcached中,首先Memcached會根據keyhash算法獲取到hash值來選擇被分配的slab,然后根據value選擇適合的dump區。所謂dump區其實就是根據value的大小來將內存按照存儲單元內容大小分頁。這個是可以配置Memcached的,例如Memcachedslab中的內存劃分成4dump,第一dump區存儲0-50k大小的數據,第二dump區存儲50-100k的數據,第三dump區存儲100-500k的數據,第四dump區存儲500-1000K的數據。那么當key:value需要被寫入的時候,很容易定位到value所處的dump,分配內存給value。這種分dump模式簡化內存管理,加速了內存回收和分配。但是這里需要注意的幾點就是,首先當你的應用場景中保存的數據大小離散度很高,那么就不是很適合Memcached的這種分配模式,容易造成浪費,例如第一dump區已經滿了,第二第三dump區都還是只有一個數據,那么第二第三dump區不會被回收,第二第三dump區的空間就浪費了。同時Memcached對于value的大小支持到1M,大于1M的內容不適合Memcached存儲。其實在Cache的設計中這樣的情況發生本來就證明設計有問題,Cache只是加速,一般保存都是較小的id或者小對象,用來驗證以及為數據定位作精準細化,而大數據量的內容還是在數據庫等存儲中。

          知道了基本的分配機制以后再回過頭來看看代碼:

          Map slabs = getCacheClient().statsItems();//獲取所有的slab

          //用來收集所有slabdump

          while(itemsItr.hasNext())

                     {

                        String server = itemsItr.next().toString();

                        Map itemNames = (Map) slabs.get(server);

                        Iterator itemNameItr = itemNames.keySet().iterator();

                       

                        while(itemNameItr.hasNext())

                        {

                            String itemName = itemNameItr.next().toString();

                           

                            // itemAtt[0] = itemname

                             // itemAtt[1] = number

                             // itemAtt[2] = field

                             String[] itemAtt = itemName.split(":");

                            

          // 如果是itemName中是:number來表示,那么證明是一個存儲數據的dump,還有一些是age的部分

                             if (itemAtt[2].startsWith("number"))

                             dumps.put(itemAtt[1], Integer.parseInt(itemAtt[1]));

                        }

                     }

                

                  //根據收集到的dump來獲取keys

          if (!dumps.values().isEmpty())

                     {

                        Iterator<Integer> dumpIter = dumps.values().iterator();

                       

                        while(dumpIter.hasNext())

                        {

                            int dump = dumpIter.next();

                           

          // statsCacheDump支持三個參數String[],int,int,第一個參數可以省略,默認填入null,表示從那些slab中獲取dump號為第二個參數的keys,如果是null就從當前所有的slab中獲取。第二個參數表示dump號,第三個參數表示返回最多多少個結果。

                            Map cacheDump = statsCacheDump(dump,limit);

                           

                            Iterator entryIter = cacheDump.values().iterator();

                           

                            while (entryIter.hasNext())

                             {

                                  Map items = (Map)entryIter.next();

                              

                                  Iterator ks = items.keySet().iterator();

                              

                              while(ks.hasNext())

                              {

                                  String k = (String)ks.next();

                                 

                                  try

                                  {

          //這里為什么要作decode,因為其實在我使用的這個java客戶端存儲的時候,默認會把key都作encoding一次,所以必須要做,不然會出現問題。

                                      k = URLDecoder.decode(k,"UTF-8");

                                  }

                                  catch(Exception ex)

                                  {

                                      Logger.error(ex);

                                  }

                                  if (k != null && !k.trim().equals(""))

                                  {

          //這里的fast參數是在方法參數中傳入,作用是什么,其實采用這種搜索slab以及dump的方式獲取keys會發現返回的可能還有一些已經移除的內容的keys,如果覺得需要準確的keys,就在做一次contains的檢查,不過速度就會有一定的影響。

                                      if (fast)

                                         keys.add(k);

                                      else

                                         if (containsKey(k))

                                             keys.add(k);

                                  }

                              }

                             }

                           

                        }

                     }

          至此,整個keySet的問題解決了,對于即時監控也基本都作好了,這里需要把過程中的兩件小事情說一下。

          1.    statsCacheDump始終不能用。

          剛開始的時候statsCacheDump方法始終報錯說連接超時,跟蹤到了java客戶端代碼中發現并不是什么連接超時,只是服務端返回了錯誤信息,而客戶端認為還沒有結束一直等待,導致超時。我就順手給java客戶端的開發人員mail了信息求助(代碼里面有email)。再仔細看了看出錯信息,返回的是不認識該指令的錯誤,因此就去解壓memcached的服務端,看了看它的協議說明,這個Stat方法還是有的,很奇怪,沒有辦法了,雖然自己對于c不是很懂,但起碼大致看懂邏輯還是不難,下載了Memcached的源碼一看,發現居然對于StatsCacheDump這個方法調用必須還有一個參數limit,在我手頭的客戶端代碼里面就沒有這個參數,所以錯誤了,本來想擴展一下那個方法,但是那個方法中實現的不是很好,都是private的不容易擴展,這時候居然收到其中一個客戶端開發者的回復郵件,說我手頭的代碼太老了,同時不建議去實現keyset,認為這樣比較低效。我去下載了一個新版本,看了看源碼果然已經修復了,我就回了郵件表示感謝,同時也和他說明了這么做的原因。因此大家如果要和我一樣寫上面的代碼,就需要它2.0.1的那個版本。這里對那些國外的開源工作者表示敬佩,對于開發者是很負責任的。

          2.關于fast那個選項

              這個是我加上去的,做了一下測試,例如我先執行如下代碼:

              Cache.set(“key1”,”value1”);

          Cache.set(“key2”,”value2”);

          Cache.flushAll(null);

          Cache.set(“key3”,”value3”);

          Cache.set(“key4”,”value4”);

          Boolean fast = true;

          Set keys = Cache.keySet(fast);

          System.out.println(keys);

          Fast = false;

          keys = Cache.keySet(fast);

          System.out.println(keys);

          得到的結果為:

          Key1,key2,key3,key4

          Key3,key4

          可以看到其實如果通過StatsCacheDump來獲取得到的keys會參雜一些已經失效的keys,只是沒有回收,本來嘗試獲取時間戳來做判斷,不過還不如使用containsKey來的有效。

          同時這里采用containsKey而不是用get,就是因為counter是不能用get獲得的,即使counter存在。

          這些就是今天在使用Memcached所收獲的,分享一下,如果有一些理解上的偏差也希望能夠被指出

          轉載:http://www.aygfsteel.com/cenwenchu/archive/2008/06/04/205942.html
          posted on 2009-08-26 11:43 CONAN 閱讀(228) 評論(0)  編輯  收藏 所屬分類: 其他技術
          主站蜘蛛池模板: 肥东县| 平原县| 兴安盟| 丰台区| 延庆县| 平舆县| 大兴区| 奇台县| 闽清县| 昔阳县| 仁化县| 伊宁市| 阿拉善盟| 武陟县| 南充市| 彭山县| 扎囊县| 聂拉木县| 福贡县| 塔城市| 凤翔县| 岑溪市| 招远市| 灌南县| 高台县| 宝兴县| 栾川县| 扶余县| 铜鼓县| 门源| 平凉市| 连江县| 彰化县| 益阳市| 隆林| 大竹县| 怀宁县| 读书| 遂平县| 图们市| 精河县|