Redis客戶端與服務(wù)器端的通信協(xié)議是如此簡(jiǎn)單
RESP協(xié)議
RESP(REdis Serialization Protocol)是redis server與redis client的通信協(xié)議。
- TCP Port 6379
- Request-Response模型。2個(gè)例外,1)pipeline;2)pub/sub
- 5種DataType,Simple String(+);Errors(-);Integers(:);Bulk String($);Arrays(*)
\r\n
(CRLF)是結(jié)束符- Simple String 例子:
"+OK\r\n"
- Errors 例子:
-WRONGTYPE Operation against a key holding the wrong kind of value
- Integer 例子:
":1000\r\n"
- Bulk String 例子:
"$6\r\nfoobar\r\n"
6表示后面有6個(gè)byte的長(zhǎng)度 - Null 例子:
"$-1\r\n"
- Arrays 例子:
"*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
2表示有2個(gè)元素;"*0\r\n"
表示空數(shù)組 - 客戶端發(fā)送命令:就是Bulk String。例子:
llen mylist
->*2\r\n$4\r\nllen\r\n$6\r\nmylist\r\n
- redis服務(wù)器回答RESP DataType。例子:
:48293\r\n
Jedis對(duì)RESP協(xié)議的抽象
- Protocol是實(shí)現(xiàn)上述RESP協(xié)議的主要類,其中可以看到
sendCommand(final RedisOutputStream os, final byte[] command, final byte[]... args)
是如何根據(jù)協(xié)議拼接字符串發(fā)送到redis server,Object read(final RedisInputStream is)
是如何接收redis server的返回,并且轉(zhuǎn)換為Java Object。 BinaryXxxCommands <- BinaryJedis, XxxCommands <- Jedis
用來(lái)抽象所有通過(guò)二進(jìn)制流來(lái)發(fā)送的Redis命令XxxCommands <- Jedis
用來(lái)抽象類似ClusterCommands的命令,最終都是走的二進(jìn)制流,去掉Binary一層估計(jì)是作者覺得厭煩了。不對(duì)之處還請(qǐng)賜教。Commands, Connection <- BinaryClient <- Client
抽象了網(wǎng)絡(luò)發(fā)送命令和接收回復(fù),其中Client將參數(shù)encode為byte[],然后調(diào)用BinaryClient的方法;BinaryClient調(diào)用Connection#sendCommand;sendCommand調(diào)用connect(),構(gòu)造RedisInputStream和RedisOutputStream,用Protocol.sendCommand來(lái)發(fā)送命令;client.getXxxReply()首先將outputstream中的內(nèi)容flush出去,然后調(diào)用Protocol.read來(lái)處理接收到的返回值。/* 發(fā)送命令 Connection.java */
protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
try {
connect();
Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this;
} catch (JedisConnectionException ex) {
// Any other exceptions related to connection?
broken = true;
throw ex;
}
}
public void connect() {
if (!isConnected()) {
try {
socket = new Socket();
// ->@wjw_add
socket.setReuseAddress(true);
socket.setKeepAlive(true); // Will monitor the TCP connection is
// valid
socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
// ensure timely delivery of data
socket.setSoLinger(true, 0); // Control calls close () method,
// the underlying socket is closed
// immediately
// <-@wjw_add
socket.connect(new InetSocketAddress(host, port), connectionTimeout);
socket.setSoTimeout(soTimeout);
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
}
/* 接收回復(fù) */
public String getBulkReply() {
final byte[] result = getBinaryBulkReply();
if (null != result) {
return SafeEncoder.encode(result);
} else {
return null;
}
}
public byte[] getBinaryBulkReply() {
flush();
pipelinedCommands--;
return (byte[]) readProtocolWithCheckingBroken();
}
protected Object readProtocolWithCheckingBroken() {
try {
return Protocol.read(inputStream);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
}
}Jedis通過(guò)Pipeline這個(gè)類來(lái)對(duì)redis的pipeline進(jìn)行抽象,
jedis.pipelined()
返回一個(gè)Pipeline實(shí)例,并且這個(gè)Pipeline實(shí)例的client就是當(dāng)前jedis實(shí)例的client;調(diào)用pipeline.a_redis_command()
的時(shí)候會(huì)有一個(gè)responseList
,用來(lái)記錄每個(gè)command應(yīng)該對(duì)應(yīng)的response;pipeline.syncAndReturnAll()
會(huì)調(diào)用client.getAll()
將所有command一次flush()
出去,然后拿回List<Object>,再將這些Object填充到responseList中。
Jedis使用注意事項(xiàng)
Jedis instance本身不是線程安全的!要用JedisPool
//將JedisPool定義為spring單例
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");
Jedis jedis = null;
try {
jedis = pool.getResource();
/// ... do stuff here ... for example
jedis.set("foo", "bar");
String foobar = jedis.get("foo");
jedis.zadd("sose", 0, "car"); jedis.zadd("sose", 0, "bike");
Set<String> sose = jedis.zrange("sose", 0, -1);
} finally {
if (jedis != null) {
jedis.close();
}
}
/// ... when closing your application:
pool.destroy();JedisPool是一個(gè)包裝模式,內(nèi)部就是Apache Common Pool 2, Pool里面裝的是Jedis。Jedis之所以不是線程安全的主要是由于Jedis類中的fields(client, pipeline, transaction)沒有做同步。如果每個(gè)thread都有一份Jedis實(shí)例,其實(shí)也不存在線程安全問(wèn)題,就是要注意使用完了需要
jedis.close()
。JedisPool和DBCP的Pool一樣,就是用來(lái)創(chuàng)建Jedis實(shí)例,然后提供給線程使用,Pool技術(shù)能夠復(fù)用已經(jīng)標(biāo)記為IDLE的Jedis,以此來(lái)提供內(nèi)存利用率和減小開銷。
小結(jié)
- Redis的通信協(xié)議簡(jiǎn)單容易實(shí)現(xiàn)
- Jedis在實(shí)現(xiàn)協(xié)議的時(shí)候用的Client將Connection和Command解耦,中規(guī)中矩,值得學(xué)習(xí)
- JedisPool用了Apache Common Pool來(lái)做到ThreadSafe