posts - 495,comments - 227,trackbacks - 0
          http://www.cnblogs.com/phinecos/archive/2012/02/16/2354834.html

          上一篇中介紹了SolrCloud的第一個(gè)模塊---構(gòu)建管理solr集群狀態(tài)信息的zookeeper集群。當(dāng)我們?cè)?/span>solr服務(wù)器啟動(dòng)時(shí)擁有了這樣一個(gè)Zookeeper集群后,顯然我們需要連接到Zookeeper集群的方便手段,在這一篇中我將對(duì)Zookeeper客戶端相關(guān)的各個(gè)封裝類進(jìn)行分析。

          SolrZkClient類是Solr服務(wù)器用來與Zookeeper集群進(jìn)行通信的接口類,它包含的主要組件有:

            private ConnectionManager connManager;
            private volatile SolrZooKeeper keeper;
            private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();

              其中ConnectionManagerWatcher的實(shí)現(xiàn)類,主要負(fù)責(zé)對(duì)客戶端與Zookeeper集群之間連接的狀態(tài)變化信息進(jìn)行響應(yīng),關(guān)于Watcher的詳細(xì)介紹,可以參考http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkWatches

          SolrZooKeeper類是一個(gè)包裝類,沒有實(shí)際意義,ZkCmdExecutor類是負(fù)責(zé)在連接失敗的情況下,重試某種操作特定次數(shù),具體的操作是ZkOperation這個(gè)抽象類的具體實(shí)現(xiàn)子類,其execute方法中包含了具體操作步驟,這些操作包括新建一個(gè)Znode節(jié)點(diǎn),讀取Znode節(jié)點(diǎn)數(shù)據(jù),創(chuàng)建Znode路徑,刪除Znode節(jié)點(diǎn)等Zookeeper操作。

          首先來看它的構(gòu)造函數(shù),先創(chuàng)建ConnectionManager對(duì)象來響應(yīng)兩端之間的狀態(tài)變化信息,然后ZkClientConnectionStrategy類是一個(gè)連接策略抽象類,它包含連接和重連兩種策略,并且采用模板方法模式,具體的實(shí)現(xiàn)是通過靜態(tài)累不類ZkUpdate來實(shí)現(xiàn)的,DefaultConnectionStrategy是它的一個(gè)實(shí)現(xiàn)子類,它覆寫了connectreconnect兩個(gè)連接策略方法。

          復(fù)制代碼
            public SolrZkClient(String zkServerAddress, int zkClientTimeout,
                ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException,
                TimeoutException, IOException {
              connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
                  + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
              strat.connect(zkServerAddress, zkClientTimeout, connManager,
                  new ZkUpdate() {
                    @Override
                    public void update(SolrZooKeeper zooKeeper) {
                      SolrZooKeeper oldKeeper = keeper;
                      keeper = zooKeeper;
                      if (oldKeeper != null) {
                        try {
                          oldKeeper.close();
                        } catch (InterruptedException e) {
                          // Restore the interrupted status
                          Thread.currentThread().interrupt();
                          log.error("", e);
                          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
                              "", e);
                        }
                      }
                    }
                  });
              connManager.waitForConnected(clientConnectTimeout);
              numOpens.incrementAndGet();
            }
          復(fù)制代碼

          值得注意的是,構(gòu)造函數(shù)中生成的ZkUpdate匿名類對(duì)象,它的update方法會(huì)被調(diào)用,

          在這個(gè)方法里,會(huì)首先將已有的老的SolrZooKeeperg關(guān)閉掉,然后放置上一個(gè)新的SolrZooKeeper。做好這些準(zhǔn)備工作以后,就會(huì)去連接Zookeeper服務(wù)器集群,

          connManager.waitForConnected(clientConnectTimeout);//連接zk服務(wù)器集群,默認(rèn)30秒超時(shí)時(shí)間

          其實(shí)具體的連接動(dòng)作是new SolrZooKeeper(serverAddress, timeout, watcher)引發(fā)的,上面那句代碼只是在等待指定時(shí)間,看是否已經(jīng)連接上。

          如果連接Zookeeper服務(wù)器集群成功,那么就可以進(jìn)行Zookeeper的常規(guī)操作了:

          1) 是否已經(jīng)連接

            public boolean isConnected() {
              return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED;
            }

          2) 是否存在某個(gè)路徑的Znode

          復(fù)制代碼
            public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
              if (retryOnConnLoss) {
                return zkCmdExecutor.retryOperation(new ZkOperation() {
                  @Override
                  public Stat execute() throws KeeperException, InterruptedException {
                    return keeper.exists(path, watcher);
                  }
                });
              } else {
                return keeper.exists(path, watcher);
              }
            }
          復(fù)制代碼

          3) 創(chuàng)建一個(gè)Znode節(jié)點(diǎn)

          復(fù)制代碼
            public String create(final String path, final byte data[], final List<ACL> acl, final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
              if (retryOnConnLoss) {
                return zkCmdExecutor.retryOperation(new ZkOperation() {
                  @Override
                  public String execute() throws KeeperException, InterruptedException {
                    return keeper.create(path, data, acl, createMode);
                  }
                });
              } else {
                return keeper.create(path, data, acl, createMode);
              }
            }
          復(fù)制代碼

          4) 獲取指定路徑下的孩子Znode節(jié)點(diǎn)

          復(fù)制代碼
            public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
              if (retryOnConnLoss) {
                return zkCmdExecutor.retryOperation(new ZkOperation() {
                  @Override
                  public List<String> execute() throws KeeperException, InterruptedException {
                    return keeper.getChildren(path, watcher);
                  }
                });
              } else {
                return keeper.getChildren(path, watcher);
              }
            }
          復(fù)制代碼

          5) 獲取指定Znode上附加的數(shù)據(jù)

          復(fù)制代碼
            public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
              if (retryOnConnLoss) {
                return zkCmdExecutor.retryOperation(new ZkOperation() {
                  @Override
                  public byte[] execute() throws KeeperException, InterruptedException {
                    return keeper.getData(path, watcher, stat);
                  }
                });
              } else {
                return keeper.getData(path, watcher, stat);
              }
            }
          復(fù)制代碼

          6) 在指定Znode上設(shè)置數(shù)據(jù)

          復(fù)制代碼
            public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
              if (retryOnConnLoss) {
                return zkCmdExecutor.retryOperation(new ZkOperation() {
                  @Override
                  public Stat execute() throws KeeperException, InterruptedException {
                    return keeper.setData(path, data, version);
                  }
                });
              } else {
                return keeper.setData(path, data, version);
              }
            }
          復(fù)制代碼

          7) 創(chuàng)建路徑

          復(fù)制代碼
            public void makePath(String path, byte[] data, CreateMode createMode, Watcher watcher, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
              if (log.isInfoEnabled()) {
                log.info("makePath: " + path);
              }
              boolean retry = true;
              
              if (path.startsWith("/")) {
                path = path.substring(1, path.length());
              }
              String[] paths = path.split("/");
              StringBuilder sbPath = new StringBuilder();
              for (int i = 0; i < paths.length; i++) {
                byte[] bytes = null;
                String pathPiece = paths[i];
                sbPath.append("/" + pathPiece);
                final String currentPath = sbPath.toString();
                Object exists = exists(currentPath, watcher, retryOnConnLoss);
                if (exists == null || ((i == paths.length -1) && failOnExists)) {
                  CreateMode mode = CreateMode.PERSISTENT;
                  if (i == paths.length - 1) {
                    mode = createMode;
                    bytes = data;
                    if (!retryOnConnLoss) retry = false;
                  }
                  try {
                    if (retry) {
                      final CreateMode finalMode = mode;
                      final byte[] finalBytes = bytes;
                      zkCmdExecutor.retryOperation(new ZkOperation() {
                        @Override
                        public Object execute() throws KeeperException, InterruptedException {
                          keeper.create(currentPath, finalBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, finalMode);
                          return null;
                        }
                      });
                    } else {
                      keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
                    }
                  } catch (NodeExistsException e) {
                    
                    if (!failOnExists) {
                      // TODO: version ? for now, don't worry about race
                      setData(currentPath, data, -1, retryOnConnLoss);
                      // set new watch
                      exists(currentPath, watcher, retryOnConnLoss);
                      return;
                    }
                    
                    // ignore unless it's the last node in the path
                    if (i == paths.length - 1) {
                      throw e;
                    }
                  }
                  if(i == paths.length -1) {
                    // set new watch
                    exists(currentPath, watcher, retryOnConnLoss);
                  }
                } else if (i == paths.length - 1) {
                  // TODO: version ? for now, don't worry about race
                  setData(currentPath, data, -1, retryOnConnLoss);
                  // set new watch
                  exists(currentPath, watcher, retryOnConnLoss);
                }
              }
            }
          復(fù)制代碼

          8) 刪除指定Znode

          復(fù)制代碼
            public void delete(final String path, final int version, boolean retryOnConnLoss) throws InterruptedException, KeeperException {
              if (retryOnConnLoss) {
                zkCmdExecutor.retryOperation(new ZkOperation() {
                  @Override
                  public Stat execute() throws KeeperException, InterruptedException {
                    keeper.delete(path, version);
                    return null;
                  }
                });
              } else {
                keeper.delete(path, version);
              }
            }
          復(fù)制代碼

                   我們?cè)倩剡^頭來看看ConnectionManager類是如何響應(yīng)兩端的連接狀態(tài)信息的變化的,它最重要的方法是process方法,當(dāng)它被觸發(fā)回調(diào)時(shí),會(huì)從WatchedEvent參數(shù)中得到事件的各種狀態(tài)信息,比如連接成功,會(huì)話過期(此時(shí)需要進(jìn)行重連),連接斷開等。

          復(fù)制代碼
            public synchronized void process(WatchedEvent event) {
              if (log.isInfoEnabled()) {
                log.info("Watcher " + this + " name:" + name + " got event " + event + " path:" + event.getPath() + " type:" + event.getType());
              }

              state = event.getState();
              if (state == KeeperState.SyncConnected) {
                connected = true;
                clientConnected.countDown();
              } else if (state == KeeperState.Expired) {
                connected = false;
                log.info("Attempting to reconnect to recover relationship with ZooKeeper...");
                //嘗試重新連接zk服務(wù)器
                try {
                  connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
                      new ZkClientConnectionStrategy.ZkUpdate() {
                        @Override
                        public void update(SolrZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
                          synchronized (connectionStrategy) {
                            waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
                            client.updateKeeper(keeper);
                            if (onReconnect != null) {
                              onReconnect.command();
                            }
                            synchronized (ConnectionManager.this) {
                              ConnectionManager.this.connected = true;
                            }
                          }
                          
                        }
                      });
                } catch (Exception e) {
                  SolrException.log(log, "", e);
                }
                log.info("Connected:" + connected);
              } else if (state == KeeperState.Disconnected) {
                connected = false;
              } else {
                connected = false;
              }
              notifyAll();
            }
          復(fù)制代碼

           

           

          posted on 2012-07-04 18:41 SIMONE 閱讀(630) 評(píng)論(0)  編輯  收藏 所屬分類: solr
          主站蜘蛛池模板: 台安县| 临洮县| 永泰县| 新源县| 苗栗县| 鄱阳县| 仪陇县| 隆安县| 和平区| 浏阳市| 仁寿县| 开江县| 河源市| 邮箱| 哈巴河县| 新竹市| 包头市| 桃园县| 湟源县| 松滋市| 大田县| 沂南县| 顺平县| 澎湖县| 沙田区| 万安县| 噶尔县| 长沙县| 沈阳市| 永福县| 右玉县| 安陆市| 鄂伦春自治旗| 江油市| 新源县| 固安县| 唐海县| 泌阳县| 萍乡市| 德清县| 准格尔旗|