xmemcached發(fā)布1.0-BETA版
Posted on 2009-03-09 15:32 dennis 閱讀(1941) 評(píng)論(1) 編輯 收藏 所屬分類: java 、my open-source xmemcached發(fā)布1.0-beta,從0.60直接到1.0-beta,主要改進(jìn)如下:
1、支持更多協(xié)議,在已有協(xié)議支持的基礎(chǔ)上添加了append、prepend、gets、批量gets、cas協(xié)議的支持,具體請(qǐng)查看XMemcachedClient類的實(shí)例方法。重點(diǎn)是cas操作,下文將詳細(xì)描述下。
2、memcached分布支持,支持連接多個(gè)memcached server,支持簡(jiǎn)單的余數(shù)分布和一致性哈希分布。
3、0.60版本以來的bug修復(fù)。
memcached 1.2.4之后開始支持cas協(xié)議,該協(xié)議存儲(chǔ)數(shù)據(jù)同時(shí)發(fā)送一個(gè)版本號(hào),只有當(dāng)這個(gè)版本號(hào)與memcached server上該key的最新版本一致時(shí)才更新成功,否則返回EXISTS,版本號(hào)的獲取需要通過gets協(xié)議獲得,cas全稱就是compare and set,如果對(duì)hibernate樂觀鎖和java.util.concurrent.atomic包都比較熟悉的話這個(gè)概念應(yīng)該很了解了。xmemcached 1.0-beta開始支持cas協(xié)議,看例子:
XMemcachedClient.cas(final String key, final int exp, Object value, long cas)將嘗試更新key的值到value,如果失敗就返回false。這樣搞好像很麻煩,需要先gets獲取cas值,然后再調(diào)用cas方法更新,因此XMemcached提供了一個(gè)包裝類可以幫你搞定這兩步,并且提供重試機(jī)制:
最高重試次數(shù)設(shè)置成了50,觀察輸出你就會(huì)知道cas沖突在高并發(fā)下非常頻繁,這個(gè)操作應(yīng)當(dāng)慎用。
說完cas,我們?cè)賮砜聪聏memcached對(duì)分布的支持。
1、如何添加多個(gè)memcached server?
通過XMemcachdClient.addServer(String ip,int port)方法,
2、怎么分布?
在添加了>=2個(gè)memcached server后,對(duì)XMemcachdClient的存儲(chǔ)、刪除等操作都將默認(rèn)根據(jù)key的哈希值模連接數(shù)的余數(shù)做分布,這也是spymemcached默認(rèn)的分布算法。這個(gè)算法簡(jiǎn)單快速,然而在添加或者移除memcached server后,緩存會(huì)大面積失效需要重組,這個(gè)代價(jià)太高,因此還有所謂Consistent Hashing算法,通過將memcached節(jié)點(diǎn)分布在一個(gè)0-2^128-1的環(huán)上,發(fā)送數(shù)據(jù)到某個(gè)節(jié)點(diǎn)經(jīng)過的跳躍次數(shù)可以縮減到O(lgn)次,并且在添加或者移除節(jié)點(diǎn)時(shí)最大限度的降低影響,這個(gè)算法的思想其實(shí)來源于p2p網(wǎng)絡(luò)的路由算法,不過路由算法比這個(gè)復(fù)雜多了,畢竟memcached的分布是在客戶端,因此不需要節(jié)點(diǎn)之間的通訊和路由表的存儲(chǔ)更新等。這個(gè)算法在java上的實(shí)現(xiàn)可以通過TreeMap紅黑樹,具體可以參考這里和這里。
在xmemcached啟動(dòng)Consistent Hashing如下:
散列函數(shù)采用CRC32,你也可以采用其他散列函數(shù),具體看場(chǎng)景測(cè)試而定,散列函數(shù)決定了你的查找節(jié)點(diǎn)效率和緩存重新分布的均衡程度。
在完成1.0-beta這個(gè)里程碑版本后,xmemcached將集中于穩(wěn)定性方面的測(cè)試和性能優(yōu)化。歡迎提交測(cè)試報(bào)告和建議,我的email killme2008@gmail.com
1、支持更多協(xié)議,在已有協(xié)議支持的基礎(chǔ)上添加了append、prepend、gets、批量gets、cas協(xié)議的支持,具體請(qǐng)查看XMemcachedClient類的實(shí)例方法。重點(diǎn)是cas操作,下文將詳細(xì)描述下。
2、memcached分布支持,支持連接多個(gè)memcached server,支持簡(jiǎn)單的余數(shù)分布和一致性哈希分布。
3、0.60版本以來的bug修復(fù)。
memcached 1.2.4之后開始支持cas協(xié)議,該協(xié)議存儲(chǔ)數(shù)據(jù)同時(shí)發(fā)送一個(gè)版本號(hào),只有當(dāng)這個(gè)版本號(hào)與memcached server上該key的最新版本一致時(shí)才更新成功,否則返回EXISTS,版本號(hào)的獲取需要通過gets協(xié)議獲得,cas全稱就是compare and set,如果對(duì)hibernate樂觀鎖和java.util.concurrent.atomic包都比較熟悉的話這個(gè)概念應(yīng)該很了解了。xmemcached 1.0-beta開始支持cas協(xié)議,看例子:
XMemcachedClient client = new XMemcachedClient();
client.addServer("localhost",11211);
client.set("a", 0, 1); //設(shè)置a為1
GetsResponse result = client.gets("a");
long cas = result.getCas(); //獲取當(dāng)前cas
//嘗試更新a成2
if (!client.cas("a", 0, 2, cas))
System.err.println("cas error");
client.addServer("localhost",11211);
client.set("a", 0, 1); //設(shè)置a為1
GetsResponse result = client.gets("a");
long cas = result.getCas(); //獲取當(dāng)前cas
//嘗試更新a成2
if (!client.cas("a", 0, 2, cas))
System.err.println("cas error");
XMemcachedClient.cas(final String key, final int exp, Object value, long cas)將嘗試更新key的值到value,如果失敗就返回false。這樣搞好像很麻煩,需要先gets獲取cas值,然后再調(diào)用cas方法更新,因此XMemcached提供了一個(gè)包裝類可以幫你搞定這兩步,并且提供重試機(jī)制:
/**
* 合并gets和cas,利用CASOperation
*/
client.cas("a", 0, new CASOperation() {
@Override
public int getMaxTries() {
return 10;
}
@Override
public Object getNewValue(long currentCAS, Object currentValue) {
System.out.println("current value " + currentValue);
return 2;
}
});
通過CASOperation,你只要實(shí)現(xiàn)兩個(gè)方法即可,getMaxTries返回最大重試次數(shù),超過這個(gè)次數(shù)還沒有更新成功就拋出TimeoutException;getNewValue方法返回依據(jù)當(dāng)前cas和緩存值,你希望設(shè)置的更新值。看一個(gè)cas更詳細(xì)的例子,開100個(gè)線程遞增緩沖中的變量a,采用cas才能保證最后a會(huì)等于100:* 合并gets和cas,利用CASOperation
*/
client.cas("a", 0, new CASOperation() {
@Override
public int getMaxTries() {
return 10;
}
@Override
public Object getNewValue(long currentCAS, Object currentValue) {
System.out.println("current value " + currentValue);
return 2;
}
});
import java.util.concurrent.CountDownLatch;
import net.rubyeye.xmemcached.CASOperation;
import net.rubyeye.xmemcached.XMemcachedClient;
/**
* 測(cè)試CAS
* @author dennis
*/
class CASThread extends Thread {
private XMemcachedClient mc;
private CountDownLatch cd;
public CASThread(XMemcachedClient mc, CountDownLatch cdl) {
super();
this.mc = mc;
this.cd = cdl;
}
public void run() {
try {
if (mc.cas("a", 0, new CASOperation() {
@Override
public int getMaxTries() {
return 50;
}
@Override
public Object getNewValue(long currentCAS, Object currentValue) {
System.out.println("currentValue=" + currentValue
+ ",currentCAS=" + currentCAS);
return ((Integer) currentValue).intValue() + 1;
}
}))
this.cd.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class CASTest {
static int NUM = 100;
public static void main(String[] args) throws Exception {
XMemcachedClient mc = new XMemcachedClient();
mc.addServer("192.168.222.100", 11211);
// 設(shè)置初始值為0
mc.set("a", 0, 0);
CountDownLatch cdl = new CountDownLatch(NUM);
// 開NUM個(gè)線程遞增變量a
for (int i = 0; i < NUM; i++)
new CASThread(mc, cdl).start();
cdl.await();
// 打印結(jié)果,最后結(jié)果應(yīng)該為NUM
System.out.println("result=" + mc.get("a"));
mc.shutdown();
}
}
import net.rubyeye.xmemcached.CASOperation;
import net.rubyeye.xmemcached.XMemcachedClient;
/**
* 測(cè)試CAS
* @author dennis
*/
class CASThread extends Thread {
private XMemcachedClient mc;
private CountDownLatch cd;
public CASThread(XMemcachedClient mc, CountDownLatch cdl) {
super();
this.mc = mc;
this.cd = cdl;
}
public void run() {
try {
if (mc.cas("a", 0, new CASOperation() {
@Override
public int getMaxTries() {
return 50;
}
@Override
public Object getNewValue(long currentCAS, Object currentValue) {
System.out.println("currentValue=" + currentValue
+ ",currentCAS=" + currentCAS);
return ((Integer) currentValue).intValue() + 1;
}
}))
this.cd.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class CASTest {
static int NUM = 100;
public static void main(String[] args) throws Exception {
XMemcachedClient mc = new XMemcachedClient();
mc.addServer("192.168.222.100", 11211);
// 設(shè)置初始值為0
mc.set("a", 0, 0);
CountDownLatch cdl = new CountDownLatch(NUM);
// 開NUM個(gè)線程遞增變量a
for (int i = 0; i < NUM; i++)
new CASThread(mc, cdl).start();
cdl.await();
// 打印結(jié)果,最后結(jié)果應(yīng)該為NUM
System.out.println("result=" + mc.get("a"));
mc.shutdown();
}
}
最高重試次數(shù)設(shè)置成了50,觀察輸出你就會(huì)知道cas沖突在高并發(fā)下非常頻繁,這個(gè)操作應(yīng)當(dāng)慎用。
說完cas,我們?cè)賮砜聪聏memcached對(duì)分布的支持。
1、如何添加多個(gè)memcached server?
通過XMemcachdClient.addServer(String ip,int port)方法,
XMemcachedClient mc = new XMemcachedClient();
mc.addServer(ip1, port1);
mc.addServer(ip2, port2);
mc.addServer(ip3, port3);
mc.addServer(ip4, port3);
mc.addServer(ip1, port1);
mc.addServer(ip2, port2);
mc.addServer(ip3, port3);
mc.addServer(ip4, port3);
2、怎么分布?
在添加了>=2個(gè)memcached server后,對(duì)XMemcachdClient的存儲(chǔ)、刪除等操作都將默認(rèn)根據(jù)key的哈希值模連接數(shù)的余數(shù)做分布,這也是spymemcached默認(rèn)的分布算法。這個(gè)算法簡(jiǎn)單快速,然而在添加或者移除memcached server后,緩存會(huì)大面積失效需要重組,這個(gè)代價(jià)太高,因此還有所謂Consistent Hashing算法,通過將memcached節(jié)點(diǎn)分布在一個(gè)0-2^128-1的環(huán)上,發(fā)送數(shù)據(jù)到某個(gè)節(jié)點(diǎn)經(jīng)過的跳躍次數(shù)可以縮減到O(lgn)次,并且在添加或者移除節(jié)點(diǎn)時(shí)最大限度的降低影響,這個(gè)算法的思想其實(shí)來源于p2p網(wǎng)絡(luò)的路由算法,不過路由算法比這個(gè)復(fù)雜多了,畢竟memcached的分布是在客戶端,因此不需要節(jié)點(diǎn)之間的通訊和路由表的存儲(chǔ)更新等。這個(gè)算法在java上的實(shí)現(xiàn)可以通過TreeMap紅黑樹,具體可以參考這里和這里。
在xmemcached啟動(dòng)Consistent Hashing如下:
XMemcachedClient client = new XMemcachedClient(new KetamaMemcachedSessionLocator(HashAlgorithm.CRC32_HASH));
client.addServer(ip, 12000);
client.addServer(ip, 12001);
client.addServer(ip, 11211);
client.addServer(ip, 12003);
client.addServer(ip, 12004);
client.addServer(ip, 12000);
client.addServer(ip, 12001);
client.addServer(ip, 11211);
client.addServer(ip, 12003);
client.addServer(ip, 12004);
散列函數(shù)采用CRC32,你也可以采用其他散列函數(shù),具體看場(chǎng)景測(cè)試而定,散列函數(shù)決定了你的查找節(jié)點(diǎn)效率和緩存重新分布的均衡程度。
在完成1.0-beta這個(gè)里程碑版本后,xmemcached將集中于穩(wěn)定性方面的測(cè)試和性能優(yōu)化。歡迎提交測(cè)試報(bào)告和建議,我的email killme2008@gmail.com