jinfeng_wang

          G-G-S,D-D-U!

          BlogJava 首頁 新隨筆 聯(lián)系 聚合 管理
            400 Posts :: 0 Stories :: 296 Comments :: 0 Trackbacks
          https://blog.huachao.me/2016/2/%E6%B7%B1%E5%85%A5Jedis/?utm_source=tuicool&utm_medium=referral

          Redis客戶端與服務器端的通信協(xié)議是如此簡單

          RESP協(xié)議

          RESP(REdis Serialization Protocol)是redis server與redis client的通信協(xié)議。

          • TCP Port 6379
          • Request-Response模型。2個例外,1)pipeline;2)pub/sub
          • 5種DataType,Simple String(+);Errors(-);Integers(:);Bulk String($);Arrays(*)
          • \r\n(CRLF)是結束符
          • Simple String 例子:"+OK\r\n"
          • Errors 例子:-WRONGTYPE Operation against a key holding the wrong kind of value
          • Integer 例子:":1000\r\n"
          • Bulk String 例子:"$6\r\nfoobar\r\n" 6表示后面有6個byte的長度
          • Null 例子:"$-1\r\n"
          • Arrays 例子:"*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n" 2表示有2個元素; "*0\r\n"表示空數(shù)組
          • 客戶端發(fā)送命令:就是Bulk String。例子:llen mylist -> *2\r\n$4\r\nllen\r\n$6\r\nmylist\r\n
          • redis服務器回答RESP DataType。例子::48293\r\n

          Jedis對RESP協(xié)議的抽象

          Jedis類圖

          • Protocol是實現(xiàn)上述RESP協(xié)議的主要類,其中可以看到sendCommand(final RedisOutputStream os, final byte[] command, final byte[]... args)是如何根據(jù)協(xié)議拼接字符串發(fā)送到redis server,Object read(final RedisInputStream is)是如何接收redis server的返回,并且轉換為Java Object。
          • BinaryXxxCommands <- BinaryJedis, XxxCommands <- Jedis 用來抽象所有通過二進制流來發(fā)送的Redis命令
          • XxxCommands <- Jedis用來抽象類似ClusterCommands的命令,最終都是走的二進制流,去掉Binary一層估計是作者覺得厭煩了。不對之處還請賜教。
          • Commands, Connection <- BinaryClient <- Client抽象了網(wǎng)絡發(fā)送命令和接收回復,其中Client將參數(shù)encode為byte[],然后調用BinaryClient的方法;BinaryClient調用Connection#sendCommand;sendCommand調用connect(),構造RedisInputStream和RedisOutputStream,用Protocol.sendCommand來發(fā)送命令;client.getXxxReply()首先將outputstream中的內容flush出去,然后調用Protocol.read來處理接收到的返回值。

            /* 發(fā)送命令 Connection.java */
            protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
            try {
            connect();
            Protocol.sendCommand(outputStream, cmd, args);
            pipelinedCommands++;
            return this;
            } catch (JedisConnectionException ex) {
            // Any other exceptions related to connection?
            broken = true;
            throw ex;
            }
            }

            public void connect() {
            if (!isConnected()) {
            try {
            socket = new Socket();
            // ->@wjw_add
            socket.setReuseAddress(true);
            socket.setKeepAlive(true); // Will monitor the TCP connection is
            // valid
            socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
            // ensure timely delivery of data
            socket.setSoLinger(true, 0); // Control calls close () method,
            // the underlying socket is closed
            // immediately
            // <-@wjw_add

            socket.connect(new InetSocketAddress(host, port), connectionTimeout);
            socket.setSoTimeout(soTimeout);
            outputStream = new RedisOutputStream(socket.getOutputStream());
            inputStream = new RedisInputStream(socket.getInputStream());
            } catch (IOException ex) {
            broken = true;
            throw new JedisConnectionException(ex);
            }
            }
            }

            /* 接收回復 */
            public String getBulkReply() {
            final byte[] result = getBinaryBulkReply();
            if (null != result) {
            return SafeEncoder.encode(result);
            } else {
            return null;
            }
            }

            public byte[] getBinaryBulkReply() {
            flush();
            pipelinedCommands--;
            return (byte[]) readProtocolWithCheckingBroken();
            }

            protected Object readProtocolWithCheckingBroken() {
            try {
            return Protocol.read(inputStream);
            } catch (JedisConnectionException exc) {
            broken = true;
            throw exc;
            }
            }
          • Jedis通過Pipeline這個類來對redis的pipeline進行抽象,jedis.pipelined()返回一個Pipeline實例,并且這個Pipeline實例的client就是當前jedis實例的client;調用pipeline.a_redis_command()的時候會有一個responseList,用來記錄每個command應該對應的response;pipeline.syncAndReturnAll()會調用client.getAll()將所有command一次flush()出去,然后拿回List<Object>,再將這些Object填充到responseList中。

          Jedis使用注意事項

          • Jedis instance本身不是線程安全的!要用JedisPool

            //將JedisPool定義為spring單例
            JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");

            Jedis jedis = null;
            try {
            jedis = pool.getResource();
            /// ... do stuff here ... for example
            jedis.set("foo", "bar");
            String foobar = jedis.get("foo");
            jedis.zadd("sose", 0, "car"); jedis.zadd("sose", 0, "bike");
            Set<String> sose = jedis.zrange("sose", 0, -1);
            } finally {
            if (jedis != null) {
            jedis.close();
            }
            }
            /// ... when closing your application:
            pool.destroy();
          • JedisPool是一個包裝模式,內部就是Apache Common Pool 2, Pool里面裝的是Jedis。Jedis之所以不是線程安全的主要是由于Jedis類中的fields(client, pipeline, transaction)沒有做同步。如果每個thread都有一份Jedis實例,其實也不存在線程安全問題,就是要注意使用完了需要jedis.close()。JedisPool和DBCP的Pool一樣,就是用來創(chuàng)建Jedis實例,然后提供給線程使用,Pool技術能夠復用已經(jīng)標記為IDLE的Jedis,以此來提供內存利用率和減小開銷。

          小結

          • Redis的通信協(xié)議簡單容易實現(xiàn)
          • Jedis在實現(xiàn)協(xié)議的時候用的Client將Connection和Command解耦,中規(guī)中矩,值得學習
          • JedisPool用了Apache Common Pool來做到ThreadSafe
          posted on 2016-12-20 15:07 jinfeng_wang 閱讀(190) 評論(0)  編輯  收藏 所屬分類: 2016-REDIS
          主站蜘蛛池模板: 扶沟县| 鄂温| 桃江县| 寿宁县| 永济市| 晴隆县| 尤溪县| 田东县| 康乐县| 青铜峡市| 涪陵区| 连州市| 那坡县| 平山县| 丰台区| 伊川县| 饶河县| 嘉黎县| 高阳县| 德清县| 都安| 南木林县| 宕昌县| 滦平县| 繁峙县| 巴彦淖尔市| 田林县| 合水县| 辛集市| 永和县| 乌苏市| 承德县| 中西区| 门头沟区| 桃园县| 南溪县| 金乡县| 萝北县| 阜康市| 岳阳县| 岫岩|