jinfeng_wang

          G-G-S,D-D-U!

          BlogJava 首頁 新隨筆 聯系 聚合 管理
            400 Posts :: 0 Stories :: 296 Comments :: 0 Trackbacks
          http://m635674608.iteye.com/blog/2297558


          Java代碼  收藏代碼
          1. BinaryJedisCluster   
          2. public String set(final byte[] key, final byte[] value) {  
          3.   return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {  
          4.     @Override  
          5.     public String execute(Jedis connection) {  
          6.       return connection.set(key, value);  
          7.     }   
          8.   }.runBinary(key);  
          9. }  

             

          Java代碼  收藏代碼
          1. JedisClusterCommand  
          2.  public T runBinary(byte[] key) {  
          3.     if (key == null) {  
          4.       throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");  
          5.     }  
          6.   
          7.     return runWithRetries(key, this.redirections, falsefalse);  
          8.   }  
          9.   
          10.  private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) {  
          11.     if (redirections <= 0) {  
          12.       throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");  
          13.     }  
          14.   
          15.     Jedis connection = null;  
          16.     try {  
          17.   
          18.       if (asking) {  
          19.         // TODO: Pipeline asking with the original command to make it  
          20.         // faster....  
          21.         connection = askConnection.get();  
          22.         connection.asking();  
          23.   
          24.         // if asking success, reset asking flag  
          25.         asking = false;  
          26.       } else {  
          27.         if (tryRandomNode) {  
          28.           connection = connectionHandler.getConnection();  
          29.         } else {  
          30. //獲取 連接  
          31.           connection = connectionHandler.getConnectionFromSlot(  
          32. //獲取槽   
          33. JedisClusterCRC16.getSlot(key));  
          34.         }  
          35.       }  
          36.   
          37.       return execute(connection);  
          38.     } catch (JedisConnectionException jce) {  
          39.       if (tryRandomNode) {  
          40.         // maybe all connection is down  
          41.         throw jce;  
          42.       }  
          43.   
          44.       // release current connection before recursion  
          45.       releaseConnection(connection);  
          46.       connection = null;  
          47.   
          48.       // retry with random connection  
          49.       return runWithRetries(key, redirections - 1true, asking);  
          50.     } catch (JedisRedirectionException jre) {  
          51.       // if MOVED redirection occurred,  
          52.       if (jre instanceof JedisMovedDataException) {  
          53.         // it rebuilds cluster's slot cache  
          54.         // recommended by Redis cluster specification  
          55.         this.connectionHandler.renewSlotCache(connection);  
          56.       }  
          57.   
          58.       // release current connection before recursion or renewing  
          59.       releaseConnection(connection);  
          60.       connection = null;  
          61.   
          62.       if (jre instanceof JedisAskDataException) {  
          63.         asking = true;  
          64.       askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));  
          65.       } else if (jre instanceof JedisMovedDataException) {  
          66.       } else {  
          67.         throw new JedisClusterException(jre);  
          68.       }  
          69.   
          70.       return runWithRetries(key, redirections - 1false, asking);  
          71.     } finally {  
          72.       releaseConnection(connection);  
          73.     }  
          74.   }  
          75.   
          76.  @Override  
          77.   public Jedis getConnectionFromSlot(int slot) {  
          78.     JedisPool connectionPool = cache.getSlotPool(slot);  
          79.     if (connectionPool != null) {  
          80.       // It can't guaranteed to get valid connection because of node  
          81.       // assignment  
          82.       return connectionPool.getResource();  
          83.     } else {  
          84.       return getConnection();  
          85.     }  
          86.   }  

            

          Java代碼  收藏代碼
          1. public abstract class JedisClusterConnectionHandler {  
          2.   protected final JedisClusterInfoCache cache;  
          3.   
          4.   public JedisClusterConnectionHandler(Set<HostAndPort> nodes,  
          5.                                        final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) {  
          6.     this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout);  
          7.     //通過slot 初始化集群信息  
          8.     initializeSlotsCache(nodes, poolConfig);  
          9.   }  
          10.   
          11.   abstract Jedis getConnection();  
          12.   
          13.   abstract Jedis getConnectionFromSlot(int slot);  
          14.   
          15.   public Jedis getConnectionFromNode(HostAndPort node) {  
          16.     cache.setNodeIfNotExist(node);  
          17.     return cache.getNode(JedisClusterInfoCache.getNodeKey(node)).getResource();  
          18.   }  
          19.   
          20. public class JedisClusterInfoCache {  
          21.   private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();  
          22.   private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();  
          23.   
          24.   private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();  
          25.   private final Lock r = rwl.readLock();  
          26.   private final Lock w = rwl.writeLock();  
          27.   private final GenericObjectPoolConfig poolConfig;  
          28.   
          29.   private int connectionTimeout;  
          30.   private int soTimeout;  
          31.   
          32.   private static final int MASTER_NODE_INDEX = 2;  
          33.   
          34.   public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {  
          35.     this(poolConfig, timeout, timeout);  
          36.   }  
          37.   
          38.   public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,  
          39.       final int connectionTimeout, final int soTimeout) {  
          40.     this.poolConfig = poolConfig;  
          41.     this.connectionTimeout = connectionTimeout;  
          42.     this.soTimeout = soTimeout;  
          43.   }  
          44.   
          45.   public void discoverClusterNodesAndSlots(Jedis jedis) {  
          46.     w.lock();  
          47.   
          48.     try {  
          49.       this.nodes.clear();  
          50.       this.slots.clear();  
          51.   
          52.       List<Object> slots = jedis.clusterSlots();  
          53.   
          54.       for (Object slotInfoObj : slots) {  
          55.         List<Object> slotInfo = (List<Object>) slotInfoObj;  
          56.   
          57.         if (slotInfo.size() <= MASTER_NODE_INDEX) {  
          58.           continue;  
          59.         }  
          60.   
          61.         List<Integer> slotNums = getAssignedSlotArray(slotInfo);  
          62.   
          63.         // hostInfos  
          64.         int size = slotInfo.size();  
          65.         for (int i = MASTER_NODE_INDEX; i < size; i++) {  
          66.           List<Object> hostInfos = (List<Object>) slotInfo.get(i);  
          67.           if (hostInfos.size() <= 0) {  
          68.             continue;  
          69.           }  
          70.   
          71.           HostAndPort targetNode = generateHostAndPort(hostInfos);  
          72.           setNodeIfNotExist(targetNode);  
          73.           if (i == MASTER_NODE_INDEX) {  
          74.             assignSlotsToNode(slotNums, targetNode);  
          75.           }  
          76.         }  
          77.       }  
          78.     } finally {  
          79.       w.unlock();  
          80.     }  
          81.   }  
          82.   //初始化集群信息   
          83.   public void discoverClusterSlots(Jedis jedis) {  
          84.     w.lock();  
          85.   
          86.     try {  
          87.       this.slots.clear();  
          88.       //通過 slots 命令獲取集群信息  
          89.       List<Object> slots = jedis.clusterSlots();  
          90.   
          91.       for (Object slotInfoObj : slots) {  
          92.         List<Object> slotInfo = (List<Object>) slotInfoObj;  
          93.   
          94.         if (slotInfo.size() <= 2) {  
          95.           continue;  
          96.         }  
          97.   
          98.         List<Integer> slotNums = getAssignedSlotArray(slotInfo);  
          99.   
          100.         // hostInfos  
          101.         List<Object> hostInfos = (List<Object>) slotInfo.get(2);  
          102.         if (hostInfos.size() <= 0) {  
          103.           continue;  
          104.         }  
          105.   
          106.         // at this time, we just use master, discard slave information  
          107.         HostAndPort targetNode = generateHostAndPort(hostInfos);  
          108.   
          109.         setNodeIfNotExist(targetNode);  
          110.         assignSlotsToNode(slotNums, targetNode);  
          111.       }  
          112.     } finally {  
          113.       w.unlock();  
          114.     }  
          115.   }  
          116.   
          117.   private HostAndPort generateHostAndPort(List<Object> hostInfos) {  
          118.     return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),  
          119.         ((Long) hostInfos.get(1)).intValue());  
          120.   }  
          121.   
          122.   public void setNodeIfNotExist(HostAndPort node) {  
          123.     w.lock();  
          124.     try {  
          125.       String nodeKey = getNodeKey(node);  
          126.       if (nodes.containsKey(nodeKey)) return;  
          127.   
          128.       JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),  
          129.               connectionTimeout, soTimeout, null0null);  
          130.       nodes.put(nodeKey, nodePool);  
          131.     } finally {  
          132.       w.unlock();  
          133.     }  
          134.   }  
          135.   
          136.   public void assignSlotToNode(int slot, HostAndPort targetNode) {  
          137.     w.lock();  
          138.     try {  
          139.       JedisPool targetPool = nodes.get(getNodeKey(targetNode));  
          140.   
          141.       if (targetPool == null) {  
          142.         setNodeIfNotExist(targetNode);  
          143.         targetPool = nodes.get(getNodeKey(targetNode));  
          144.       }  
          145.       slots.put(slot, targetPool);  
          146.     } finally {  
          147.       w.unlock();  
          148.     }  
          149.   }  
          150.   
          151.   public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {  
          152.     w.lock();  
          153.     try {  
          154.       JedisPool targetPool = nodes.get(getNodeKey(targetNode));  
          155.   
          156.       if (targetPool == null) {  
          157.         setNodeIfNotExist(targetNode);  
          158.         targetPool = nodes.get(getNodeKey(targetNode));  
          159.       }  
          160.   
          161.       for (Integer slot : targetSlots) {  
          162.         slots.put(slot, targetPool);  
          163.       }  
          164.     } finally {  
          165.       w.unlock();  
          166.     }  
          167.   }  
          168.   
          169.   public JedisPool getNode(String nodeKey) {  
          170.     r.lock();  
          171.     try {  
          172.       return nodes.get(nodeKey);  
          173.     } finally {  
          174.       r.unlock();  
          175.     }  
          176.   }  
          177.   
          178.   public JedisPool getSlotPool(int slot) {  
          179.     r.lock();  
          180.     try {  
          181.       return slots.get(slot);  
          182.     } finally {  
          183.       r.unlock();  
          184.     }  
          185.   }  
          186.   
          187.   public Map<String, JedisPool> getNodes() {  
          188.     r.lock();  
          189.     try {  
          190.       return new HashMap<String, JedisPool>(nodes);  
          191.     } finally {  
          192.       r.unlock();  
          193.     }  
          194.   }  
          195.   
          196.   public static String getNodeKey(HostAndPort hnp) {  
          197.     return hnp.getHost() + ":" + hnp.getPort();  
          198.   }  
          199.   
          200.   public static String getNodeKey(Client client) {  
          201.     return client.getHost() + ":" + client.getPort();  
          202.   }  
          203.   
          204.   public static String getNodeKey(Jedis jedis) {  
          205.     return getNodeKey(jedis.getClient());  
          206.   }  
          207.   
          208.   private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {  
          209.     List<Integer> slotNums = new ArrayList<Integer>();  
          210.     for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))  
          211.         .intValue(); slot++) {  
          212.       slotNums.add(slot);  
          213.     }  
          214.     return slotNums;  
          215.   }  
          216.   
          217. }  
          218.   
          219.     
          220.   public Map<String, JedisPool> getNodes() {  
          221.     return cache.getNodes();  
          222.   }  
          223.   
          224.   private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) {  
          225.     for (HostAndPort hostAndPort : startNodes) {  
          226.       Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());  
          227.       try {  
          228.         cache.discoverClusterNodesAndSlots(jedis);  
          229.         break;  
          230.       } catch (JedisConnectionException e) {  
          231.         // try next nodes  
          232.       } finally {  
          233.         if (jedis != null) {  
          234.           jedis.close();  
          235.         }  
          236.       }  
          237.     }  
          238.   
          239.     for (HostAndPort node : startNodes) {  
          240.       cache.setNodeIfNotExist(node);  
          241.     }  
          242.   }  
          243.   
          244.   public void renewSlotCache() {  
          245.     for (JedisPool jp : getShuffledNodesPool()) {  
          246.       Jedis jedis = null;  
          247.       try {  
          248.         jedis = jp.getResource();  
          249.         cache.discoverClusterSlots(jedis);  
          250.         break;  
          251.       } catch (JedisConnectionException e) {  
          252.         // try next nodes  
          253.       } finally {  
          254.         if (jedis != null) {  
          255.           jedis.close();  
          256.         }  
          257.       }  
          258.     }  
          259.   }  
          260.   
          261.   public void renewSlotCache(Jedis jedis) {  
          262.     try {  
          263.       cache.discoverClusterSlots(jedis);  
          264.     } catch (JedisConnectionException e) {  
          265.       renewSlotCache();  
          266.     }  
          267.   }  
          268.   
          269.   protected List<JedisPool> getShuffledNodesPool() {  
          270.     List<JedisPool> pools = new ArrayList<JedisPool>();  
          271.     pools.addAll(cache.getNodes().values());  
          272.     Collections.shuffle(pools);  
          273.     return pools;  
          274.   }  
          275. }  

             總結:

            1.JedisCluster 會初始化一個 連接獲取集群信息通過 solts 命令。(JedisClusterInfoCache 構造方法初始化)

            2.get ,set 的時候。會通過key JedisClusterCRC16.getSlot(key) 定位到solt

            3. 然后根據solt獲取 jedis

          public Jedis getConnectionFromSlot(int slot) {
              JedisPool connectionPool = cache.getSlotPool(slot);

            4.執行操作

           

            讀操作:主庫,從庫都會讀

            寫操作:主庫寫

          posted on 2016-12-20 16:29 jinfeng_wang 閱讀(821) 評論(0)  編輯  收藏 所屬分類: 2016-REDIS
          主站蜘蛛池模板: 中宁县| 内丘县| 武乡县| 灵山县| 冀州市| 阆中市| 哈尔滨市| 汶上县| 西城区| 溆浦县| 武安市| 志丹县| 金平| 图们市| 平泉县| 达拉特旗| 巴彦县| 侯马市| 禄劝| 保德县| 咸丰县| 大理市| 吉木乃县| 台安县| 黑山县| 邵阳县| 拉孜县| 秭归县| 德保县| 武川县| 威远县| 开平市| 长子县| 芜湖县| 乐清市| 白水县| 广元市| 栾城县| 罗城| 玉溪市| 贵溪市|