我的家園

          我的家園
            RelicaSetStatus 用于讀取集群中的服務(wù)器節(jié)點(diǎn)的信息。

            getMaster 方法和 getASecondary 方法分別可用于以獲取 master 節(jié)點(diǎn)和 secondary 節(jié)點(diǎn)。
            內(nèi)部類 ReplicaSetStatus.Node 包含了節(jié)點(diǎn)的狀態(tài)信息,內(nèi)部類 ReplicaSetStatus.Node.Updater 用于實(shí)例化一個定時更新節(jié)點(diǎn)狀態(tài)的線程。

            getMaster 方法和 getASecondary 方法分析如下:
              // 獲取 master 服務(wù)器地址
              ServerAddress getMaster(){
                  // 獲取 master 服務(wù)器節(jié)點(diǎn)
                  Node n = getMasterNode();
                  if ( n == null )
                      return null;
          
                  // 返回節(jié)點(diǎn)地址
                  return n._addr;
              }
          
              // 獲取 mater 服務(wù)器節(jié)點(diǎn)
              Node getMasterNode(){
                  // 檢查數(shù)據(jù)連是否已經(jīng)關(guān)閉接
                  _checkClosed();
          
                  // 遍歷所有節(jié)點(diǎn),找到 master 服務(wù)器節(jié)點(diǎn)
                  for ( int i=0; i<_all.size(); i++ ){
                      Node n = _all.get(i);
                      if ( n.master() )
                          return n;
                  }
                  return null;
              }
          
              // 獲取一個最佳的 secondary 服務(wù)器地址
              ServerAddress getASecondary(){
                  // 檢查數(shù)據(jù)連是否已經(jīng)關(guān)閉接
                  _checkClosed();
          
                  Node best = null;
                  double badBeforeBest = 0;
          
                  // 隨機(jī)選取起點(diǎn)
                  int start = _random.nextInt( _all.size() );
          
                  double mybad = 0;
          
                  for ( int i=0; i<_all.size(); i++ ){
                      Node n = _all.get( ( start + i ) % _all.size() );
          
                      // 不是 secondary 節(jié)點(diǎn),跳過
                      if ( ! n.secondary() ){
                          mybad++;
                          continue;
                      }
          
                      // 找到第一個  secondary 節(jié)點(diǎn)
                      // 設(shè)置 best,繼續(xù)查找
                      if ( best == null )
                          best = n;
                          badBeforeBest = mybad;
                          mybad = 0;
                          continue;
                      }
          
                      // 第 n 個 secondary 節(jié)點(diǎn)
                      // 與之前找到的節(jié)點(diǎn)比較,選用最好的
                      // 比較 ping 值
                      long diff = best._pingTime - n._pingTime;
                      if ( diff > slaveAcceptableLatencyMS ||
                           // 一種保證隨機(jī)分布的算法
                           ( ( badBeforeBest - mybad ) / ( _all.size() - 1 ) ) > _random.nextDouble() )
                          {
                          best = n;
                          badBeforeBest = mybad;
                          mybad = 0;
                      }
          
                  }
          
                  if ( best == null )
                      return null;
          
                  // 返回 best 的地址
                  return best._addr;
              }
          

          包含節(jié)點(diǎn)狀態(tài)信息的內(nèi)部類 ReplicaSetStatus.Node

            ReplicaSetStatus.Node 包含了節(jié)點(diǎn)的信息:
          final ServerAddress _addr; // 地址
          final Set<String> _names = Collections.synchronizedSet( new HashSet<String>() ); // 節(jié)點(diǎn)名稱
          DBPort _port; // 數(shù)據(jù)庫端口
          
          boolean _ok = false;  // 狀態(tài)是否正常
          long _lastCheck = 0;  // 上次檢查時間
          long _pingTime = 0;  // ping 延時
          
          boolean _isMaster = false;  // 是否為 master 節(jié)點(diǎn)
          boolean _isSecondary = false;  // 是否為 secondary 節(jié)點(diǎn)
          
          double _priority = 0; // 優(yōu)先級
          


            另外,它也提供了更新節(jié)點(diǎn)的方法 upadate 和 updateAll:
                  // 更新節(jié)點(diǎn)狀態(tài)
                  synchronized void update(Set<Node> seenNodes){
                      try {
          
                          // 發(fā)送 admin 請求,檢查狀態(tài) 
                          long start = System.currentTimeMillis();
                          CommandResult res = _port.runCommand( _mongo.getDB("admin") , _isMasterCmd );
                          _lastCheck = System.currentTimeMillis();
                          _pingTime = _lastCheck - start;
          
                          // 狀態(tài)異常
                          if ( res == null ){
                              _ok = false;
                              return;
                          }
          
                          // 狀態(tài)正常
                          _ok = true;
           
                         // 是 mater 節(jié)點(diǎn)
                          _isMaster = res.getBoolean( "ismaster" , false );
          
                          // 是 secondary 節(jié)點(diǎn) 
                          _isSecondary = res.getBoolean( "secondary" , false );
          
                          // 是 primary 節(jié)點(diǎn)
                          _lastPrimarySignal = res.getString( "primary" );
          
                          // 獲取 hosts 信息
                          if ( res.containsField( "hosts" ) ){
                              for ( Object x : (List)res.get("hosts") ){
                                  String host = x.toString();
                                  Node node = _addIfNotHere(host);
                                  if (node != null && seenNodes != null)
                                      seenNodes.add(node);
                              }
                          }
          
                          // 獲取 passives 信息
                          if ( res.containsField( "passives" ) ){
                              for ( Object x : (List)res.get("passives") ){
                                  String host = x.toString();
                                  Node node = _addIfNotHere(host);
                                  if (node != null && seenNodes != null)
                                      seenNodes.add(node);
                              }
                          }
          
                          // 獲取 maxBsonObjectSize
                          if (_isMaster ) {
                              if (res.containsField("maxBsonObjectSize"))
                                  maxBsonObjectSize = ((Integer)res.get( "maxBsonObjectSize" )).intValue();
                              else
                                  maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE;
                          }
          
                          // 獲取 setName
                          if (res.containsField("setName")) {
          	                String setName = res.get( "setName" ).toString();
          	                if ( _setName == null ){
          	                    _setName = setName;
          	                    _logger = Logger.getLogger( _rootLogger.getName() + "." + setName );
          	                }
          	                else if ( !_setName.equals( setName ) ){
          	                    _logger.log( Level.SEVERE , "mis match set name old: " + _setName + " new: " + setName );
          	                    return;
          	                }
                          }
          
                      }
                      catch ( ... ){
                          // ...
                      }
                  }
          
              // 更新所有節(jié)點(diǎn)狀態(tài)
              synchronized void updateAll(){
                  HashSet<Node> seenNodes = new HashSet<Node>();
          
                 // 遍歷更新所有節(jié)點(diǎn)
                 for ( int i=0; i<_all.size(); i++ ){
                      Node n = _all.get(i);
                      n.update(seenNodes);
                  }
          
                  // 移除已經(jīng)不存在的節(jié)點(diǎn)
                  if (!seenNodes.isEmpty()) {
                      // not empty, means that at least 1 server gave node list
                      // remove unused hosts
                      Iterator<Node> it = _all.iterator();
                      while (it.hasNext()) {
                          if (!seenNodes.contains(it.next()))
                              it.remove();
                      }
                  }
              }
          

          定時更新節(jié)點(diǎn)狀態(tài)的內(nèi)部類  ReplicaSetStatus.Node.Updater

            ReplicaSetStatus.Node.Updater 繼承了 Thread,可以實(shí)例化一個定時更新節(jié)點(diǎn)狀態(tài)的線程。
                  // 覆寫 Thread 類的 run 方法
                  public void run(){
                      while ( ! _closed ){
                          try {
                              // 更新所有節(jié)點(diǎn)狀態(tài)
                              updateAll();
          
                              // 如果當(dāng)前時間大于 _nextResolveTime
                              // 則更新所有節(jié)點(diǎn)并設(shè)置 _nextResolveTime
                              long now = System.currentTimeMillis();
                              if (inetAddrCacheMS > 0 && _nextResolveTime < now) {
                                  _nextResolveTime = now + inetAddrCacheMS;
                                  for (Node node : _all) {
                                      node.updateAddr();
                                  }
                              }
          
                              // 檢查 master ,以避免更新帶來的不同步
                              _mongo.getConnector().checkMaster(true, false);
                          }
                          catch ( Exception e ){
                              _logger.log( Level.WARNING , "couldn't do update pass" , e );
                          }
          
                          // sleep一段時間,等待下次更新
                          try {
                              Thread.sleep( updaterIntervalMS );
                          }
                          catch ( InterruptedException ie ){
                          }
          
                      }
                  }
          





          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 阳新县| 天长市| 华宁县| 阿巴嘎旗| 宕昌县| 牟定县| 翼城县| 屏东县| 大新县| 略阳县| 鸡泽县| 浙江省| 资中县| 蒙自县| 萨迦县| 永平县| 安宁市| 毕节市| 绥阳县| 饶平县| 奉新县| 平定县| 荔波县| 衡山县| 建瓯市| 阿克陶县| 文水县| 米易县| 庐江县| 乌拉特后旗| 牙克石市| 临泽县| 海安县| 田林县| 介休市| 福建省| 宜章县| 柞水县| 板桥市| 滕州市| 怀化市|