Session連接
Zookeeper客戶端和服務(wù)端維持一個(gè)長(zhǎng)連接,每隔10s向服務(wù)端發(fā)送一個(gè)心跳,服務(wù)端返回客戶端一個(gè)響應(yīng)。這就是一個(gè)Session連接,擁有全局唯一的session id。Session連接通常是一直有效,如果因?yàn)榫W(wǎng)絡(luò)原因斷開了連接,客戶端會(huì)使用相同的session id進(jìn)行重連。由于服務(wù)端保留了session的各種狀態(tài),尤其是各種瞬時(shí)節(jié)點(diǎn)是否刪除依賴于session是否失效。
Session失效問題
通常客戶端主動(dòng)關(guān)閉連接認(rèn)為是一次session失效。另外也有可能因?yàn)槠渌粗颍缇W(wǎng)絡(luò)超時(shí)導(dǎo)致的session失效問題。在服務(wù)端看來(lái),無(wú)法區(qū)分session失效是何種情況,一次一旦發(fā)生session失效,一定時(shí)間后就會(huì)將session持有的所有watcher以及瞬時(shí)節(jié)點(diǎn)刪除。
而對(duì)于Zookeeper客戶端而言,一旦發(fā)生失效不知道是否該重連,這涉及到watcher和瞬時(shí)節(jié)點(diǎn)問題,因此Zookeeper客戶端認(rèn)為,一旦發(fā)生了seesion失效,那么就認(rèn)為客戶端死掉了。從而所有操作都不能夠進(jìn)行。參考 How should I handle SESSION_EXPIRED?
解決方案
對(duì)于只是簡(jiǎn)單查詢服務(wù)的客戶端而言,session失效后只需要重新建立連接即可。而對(duì)于需要處理瞬時(shí)節(jié)點(diǎn)以及各種watcher的服務(wù)來(lái)說(shuō),應(yīng)用程序需要處理session失效或者重連帶來(lái)的副作用。
下面的邏輯提供了一種簡(jiǎn)單的解決session重連的問題,這是指重新生成新的連接。
public static org.apache.zookeeper.ZooKeeper getZooKeeper() {
if (zookeeper == null) {
synchronized (ZookeeperClient.class) {
if (zookeeper == null) {
latch = new CountDownLatch(1);
zookeeper = buildClient();//如果失敗,下次還有成功的機(jī)會(huì)
long startTime = System.currentTimeMillis();
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
final SocketAddress remoteAddres = zookeeper.testableRemoteSocketAddress();
System.out.println("[SUC-CORE] local host: " + zookeeper.testableLocalSocketAddress());
System.out.println("[SUC-CORE] remote host: " + remoteAddres);
System.out.println("[SUC-CORE] zookeeper session id: " + zookeeper.getSessionId());
final String remoteHost = remoteAddres != null ? ((InetSocketAddress) remoteAddres).getAddress().getHostAddress() : "";
System.out.println("[SUC-CORE] init cost: " + (System.currentTimeMillis() - startTime) + "(ms) " + remoteHost);
latch = null;
}
}
}
}
return zookeeper;
}
上面的代碼只是簡(jiǎn)單的使用一個(gè)Double-Check來(lái)維持Zookeeper客戶端的單實(shí)例。保證總是有機(jī)會(huì)重建客戶端以及只有一個(gè)單例(這是因?yàn)閆ookeeper客戶端是線程安全的)。
例外上面的例子中繼承了org.apache.zookeeper.ZooKeeper類,以便能夠拿到session對(duì)于的服務(wù)端ip地址以及客戶端地址,方便調(diào)試問題。
在構(gòu)建客戶端的時(shí)候是需要設(shè)置session超時(shí)的時(shí)間,例如下面的代碼就是30秒。
private static ZooKeeper buildClient() {
final String rootPath = getRootPath();
final String connectString = SystemConfig.getInstance().getString("zookeeper.ips",//
"192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181,192.168.10.4:2181,192.168.10.5:2181");
System.out.printf("[SUC-CORE] rootPath: %1s\n", rootPath);
System.out.printf("[SUC-CORE] connectString:%1s\n", connectString);
try {
return new ZooKeeper(connectString + rootPath, 30000, new SessionWatcher());
} catch (IOException e) {
throw new RuntimeException("init zookeeper fail.", e);
}
}
為了處理連接建立成功以及斷開問題,我們需要一個(gè)Watcher來(lái)處理此問題。
static class SessionWatcherimplements Watcher {
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
if (latch !=
null) {
latch.countDown();
}
}
else if (event.getState() == KeeperState.Expired) {
System.out.println("[SUC-CORE] session expired. now rebuilding

");
//session expired, may be never happending.
//close old client and rebuild new client
close();
getZooKeeper();
}
}
}
一旦檢測(cè)到session失效了(Expired),那么久銷毀已經(jīng)建立的客戶端實(shí)例,重新生成一個(gè)客戶端實(shí)例。
/**
* 關(guān)閉zookeeper連接,釋放資源
*/
public static void close() {
System.out.println("[SUC-CORE] close");
if (zookeeper != null) {
try {
zookeeper.close();
zookeeper = null;
} catch (InterruptedException e) {
//ignore exception
}
}
}
這是一種簡(jiǎn)單的處理Session expired問題的方法,顯然這不會(huì)處理瞬時(shí)節(jié)點(diǎn)的問題,因此如果有相關(guān)的需求,業(yè)務(wù)系統(tǒng)(應(yīng)用程序)需要自己修復(fù)問題。
測(cè)試方案
根據(jù)Is there an easy way to expire a session for testing? 提供的測(cè)試方法,寫一個(gè)簡(jiǎn)單的例子試驗(yàn)下。
public static void main(String[] args) throws Exception {
ZooKeeper zk = ZookeeperClient.getZooKeeper();
long sessionId = zk.getSessionId();
//
final String rootPath = ZookeeperClient.getRootPath();
final String connectString = SystemConfig.getInstance().getString("zookeeper.ips",//
"192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181,192.168.10.4:2181,192.168.10.5:2181");
// close the old connection
new ZooKeeper(connectString + rootPath, 30000, null, sessionId, null).close();
Thread.sleep(10000L);
//
// rebuild a new session
long newSessionid = ZookeeperClient.getZooKeeper().getSessionId();
// check the new session
String status = newSessionid != sessionId ? "OK" : "FAIL";
System.out.println(format("%s --> %s %s", sessionId, newSessionid, status));
// close the client
ZookeeperClient.getZooKeeper().close();
}
最后能夠自動(dòng)處理session expired的問題。實(shí)驗(yàn)中看到確實(shí)執(zhí)行了session expired的邏輯重新生成了新的session。