我的家園

          我的家園
            RelicaSetStatus 用于讀取集群中的服務器節點的信息。

            getMaster 方法和 getASecondary 方法分別可用于以獲取 master 節點和 secondary 節點。
            內部類 ReplicaSetStatus.Node 包含了節點的狀態信息,內部類 ReplicaSetStatus.Node.Updater 用于實例化一個定時更新節點狀態的線程。

            getMaster 方法和 getASecondary 方法分析如下:
              // 獲取 master 服務器地址
              ServerAddress getMaster(){
                  // 獲取 master 服務器節點
                  Node n = getMasterNode();
                  if ( n == null )
                      return null;
          
                  // 返回節點地址
                  return n._addr;
              }
          
              // 獲取 mater 服務器節點
              Node getMasterNode(){
                  // 檢查數據連是否已經關閉接
                  _checkClosed();
          
                  // 遍歷所有節點,找到 master 服務器節點
                  for ( int i=0; i<_all.size(); i++ ){
                      Node n = _all.get(i);
                      if ( n.master() )
                          return n;
                  }
                  return null;
              }
          
              // 獲取一個最佳的 secondary 服務器地址
              ServerAddress getASecondary(){
                  // 檢查數據連是否已經關閉接
                  _checkClosed();
          
                  Node best = null;
                  double badBeforeBest = 0;
          
                  // 隨機選取起點
                  int start = _random.nextInt( _all.size() );
          
                  double mybad = 0;
          
                  for ( int i=0; i<_all.size(); i++ ){
                      Node n = _all.get( ( start + i ) % _all.size() );
          
                      // 不是 secondary 節點,跳過
                      if ( ! n.secondary() ){
                          mybad++;
                          continue;
                      }
          
                      // 找到第一個  secondary 節點
                      // 設置 best,繼續查找
                      if ( best == null )
                          best = n;
                          badBeforeBest = mybad;
                          mybad = 0;
                          continue;
                      }
          
                      // 第 n 個 secondary 節點
                      // 與之前找到的節點比較,選用最好的
                      // 比較 ping 值
                      long diff = best._pingTime - n._pingTime;
                      if ( diff > slaveAcceptableLatencyMS ||
                           // 一種保證隨機分布的算法
                           ( ( badBeforeBest - mybad ) / ( _all.size() - 1 ) ) > _random.nextDouble() )
                          {
                          best = n;
                          badBeforeBest = mybad;
                          mybad = 0;
                      }
          
                  }
          
                  if ( best == null )
                      return null;
          
                  // 返回 best 的地址
                  return best._addr;
              }
          

          包含節點狀態信息的內部類 ReplicaSetStatus.Node

            ReplicaSetStatus.Node 包含了節點的信息:
          final ServerAddress _addr; // 地址
          final Set<String> _names = Collections.synchronizedSet( new HashSet<String>() ); // 節點名稱
          DBPort _port; // 數據庫端口
          
          boolean _ok = false;  // 狀態是否正常
          long _lastCheck = 0;  // 上次檢查時間
          long _pingTime = 0;  // ping 延時
          
          boolean _isMaster = false;  // 是否為 master 節點
          boolean _isSecondary = false;  // 是否為 secondary 節點
          
          double _priority = 0; // 優先級
          


            另外,它也提供了更新節點的方法 upadate 和 updateAll:
                  // 更新節點狀態
                  synchronized void update(Set<Node> seenNodes){
                      try {
          
                          // 發送 admin 請求,檢查狀態 
                          long start = System.currentTimeMillis();
                          CommandResult res = _port.runCommand( _mongo.getDB("admin") , _isMasterCmd );
                          _lastCheck = System.currentTimeMillis();
                          _pingTime = _lastCheck - start;
          
                          // 狀態異常
                          if ( res == null ){
                              _ok = false;
                              return;
                          }
          
                          // 狀態正常
                          _ok = true;
           
                         // 是 mater 節點
                          _isMaster = res.getBoolean( "ismaster" , false );
          
                          // 是 secondary 節點 
                          _isSecondary = res.getBoolean( "secondary" , false );
          
                          // 是 primary 節點
                          _lastPrimarySignal = res.getString( "primary" );
          
                          // 獲取 hosts 信息
                          if ( res.containsField( "hosts" ) ){
                              for ( Object x : (List)res.get("hosts") ){
                                  String host = x.toString();
                                  Node node = _addIfNotHere(host);
                                  if (node != null && seenNodes != null)
                                      seenNodes.add(node);
                              }
                          }
          
                          // 獲取 passives 信息
                          if ( res.containsField( "passives" ) ){
                              for ( Object x : (List)res.get("passives") ){
                                  String host = x.toString();
                                  Node node = _addIfNotHere(host);
                                  if (node != null && seenNodes != null)
                                      seenNodes.add(node);
                              }
                          }
          
                          // 獲取 maxBsonObjectSize
                          if (_isMaster ) {
                              if (res.containsField("maxBsonObjectSize"))
                                  maxBsonObjectSize = ((Integer)res.get( "maxBsonObjectSize" )).intValue();
                              else
                                  maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE;
                          }
          
                          // 獲取 setName
                          if (res.containsField("setName")) {
          	                String setName = res.get( "setName" ).toString();
          	                if ( _setName == null ){
          	                    _setName = setName;
          	                    _logger = Logger.getLogger( _rootLogger.getName() + "." + setName );
          	                }
          	                else if ( !_setName.equals( setName ) ){
          	                    _logger.log( Level.SEVERE , "mis match set name old: " + _setName + " new: " + setName );
          	                    return;
          	                }
                          }
          
                      }
                      catch ( ... ){
                          // ...
                      }
                  }
          
              // 更新所有節點狀態
              synchronized void updateAll(){
                  HashSet<Node> seenNodes = new HashSet<Node>();
          
                 // 遍歷更新所有節點
                 for ( int i=0; i<_all.size(); i++ ){
                      Node n = _all.get(i);
                      n.update(seenNodes);
                  }
          
                  // 移除已經不存在的節點
                  if (!seenNodes.isEmpty()) {
                      // not empty, means that at least 1 server gave node list
                      // remove unused hosts
                      Iterator<Node> it = _all.iterator();
                      while (it.hasNext()) {
                          if (!seenNodes.contains(it.next()))
                              it.remove();
                      }
                  }
              }
          

          定時更新節點狀態的內部類  ReplicaSetStatus.Node.Updater

            ReplicaSetStatus.Node.Updater 繼承了 Thread,可以實例化一個定時更新節點狀態的線程。
                  // 覆寫 Thread 類的 run 方法
                  public void run(){
                      while ( ! _closed ){
                          try {
                              // 更新所有節點狀態
                              updateAll();
          
                              // 如果當前時間大于 _nextResolveTime
                              // 則更新所有節點并設置 _nextResolveTime
                              long now = System.currentTimeMillis();
                              if (inetAddrCacheMS > 0 && _nextResolveTime < now) {
                                  _nextResolveTime = now + inetAddrCacheMS;
                                  for (Node node : _all) {
                                      node.updateAddr();
                                  }
                              }
          
                              // 檢查 master ,以避免更新帶來的不同步
                              _mongo.getConnector().checkMaster(true, false);
                          }
                          catch ( Exception e ){
                              _logger.log( Level.WARNING , "couldn't do update pass" , e );
                          }
          
                          // sleep一段時間,等待下次更新
                          try {
                              Thread.sleep( updaterIntervalMS );
                          }
                          catch ( InterruptedException ie ){
                          }
          
                      }
                  }
          





          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 屯留县| 周口市| 增城市| 固安县| 舟曲县| 偃师市| 兴和县| 乐平市| 米易县| 华亭县| 淮安市| 莒南县| 化州市| 堆龙德庆县| 元氏县| 虹口区| 怀仁县| 阿克苏市| 泸水县| 甘孜县| 溧水县| 麦盖提县| 安新县| 贵阳市| 新龙县| 林甸县| 白银市| 焦作市| 定南县| 庆阳市| 深州市| 宁乡县| 酒泉市| 西丰县| 内黄县| 绿春县| 牟定县| 和田县| 波密县| 唐山市| 海淀区|