MongoDB Java Driver 源碼分析(8):com.mongodb.RelicaSetStatus
Posted on 2012-04-15 16:27 zljpp 閱讀(165) 評論(0) 編輯 收藏
RelicaSetStatus 用于讀取集群中的服務器節點的信息。
getMaster 方法和 getASecondary 方法分別可用于以獲取 master 節點和 secondary 節點。
內部類 ReplicaSetStatus.Node 包含了節點的狀態信息,內部類 ReplicaSetStatus.Node.Updater 用于實例化一個定時更新節點狀態的線程。
getMaster 方法和 getASecondary 方法分析如下:
ReplicaSetStatus.Node 包含了節點的信息:
另外,它也提供了更新節點的方法 upadate 和 updateAll:
ReplicaSetStatus.Node.Updater 繼承了 Thread,可以實例化一個定時更新節點狀態的線程。
getMaster 方法和 getASecondary 方法分別可用于以獲取 master 節點和 secondary 節點。
內部類 ReplicaSetStatus.Node 包含了節點的狀態信息,內部類 ReplicaSetStatus.Node.Updater 用于實例化一個定時更新節點狀態的線程。
getMaster 方法和 getASecondary 方法分析如下:
// 獲取 master 服務器地址 ServerAddress getMaster(){ // 獲取 master 服務器節點 Node n = getMasterNode(); if ( n == null ) return null; // 返回節點地址 return n._addr; } // 獲取 mater 服務器節點 Node getMasterNode(){ // 檢查數據連是否已經關閉接 _checkClosed(); // 遍歷所有節點,找到 master 服務器節點 for ( int i=0; i<_all.size(); i++ ){ Node n = _all.get(i); if ( n.master() ) return n; } return null; } // 獲取一個最佳的 secondary 服務器地址 ServerAddress getASecondary(){ // 檢查數據連是否已經關閉接 _checkClosed(); Node best = null; double badBeforeBest = 0; // 隨機選取起點 int start = _random.nextInt( _all.size() ); double mybad = 0; for ( int i=0; i<_all.size(); i++ ){ Node n = _all.get( ( start + i ) % _all.size() ); // 不是 secondary 節點,跳過 if ( ! n.secondary() ){ mybad++; continue; } // 找到第一個 secondary 節點 // 設置 best,繼續查找 if ( best == null ) best = n; badBeforeBest = mybad; mybad = 0; continue; } // 第 n 個 secondary 節點 // 與之前找到的節點比較,選用最好的 // 比較 ping 值 long diff = best._pingTime - n._pingTime; if ( diff > slaveAcceptableLatencyMS || // 一種保證隨機分布的算法 ( ( badBeforeBest - mybad ) / ( _all.size() - 1 ) ) > _random.nextDouble() ) { best = n; badBeforeBest = mybad; mybad = 0; } } if ( best == null ) return null; // 返回 best 的地址 return best._addr; }
包含節點狀態信息的內部類 ReplicaSetStatus.Node
ReplicaSetStatus.Node 包含了節點的信息:
final ServerAddress _addr; // 地址 final Set<String> _names = Collections.synchronizedSet( new HashSet<String>() ); // 節點名稱 DBPort _port; // 數據庫端口 boolean _ok = false; // 狀態是否正常 long _lastCheck = 0; // 上次檢查時間 long _pingTime = 0; // ping 延時 boolean _isMaster = false; // 是否為 master 節點 boolean _isSecondary = false; // 是否為 secondary 節點 double _priority = 0; // 優先級
另外,它也提供了更新節點的方法 upadate 和 updateAll:
// 更新節點狀態 synchronized void update(Set<Node> seenNodes){ try { // 發送 admin 請求,檢查狀態 long start = System.currentTimeMillis(); CommandResult res = _port.runCommand( _mongo.getDB("admin") , _isMasterCmd ); _lastCheck = System.currentTimeMillis(); _pingTime = _lastCheck - start; // 狀態異常 if ( res == null ){ _ok = false; return; } // 狀態正常 _ok = true; // 是 mater 節點 _isMaster = res.getBoolean( "ismaster" , false ); // 是 secondary 節點 _isSecondary = res.getBoolean( "secondary" , false ); // 是 primary 節點 _lastPrimarySignal = res.getString( "primary" ); // 獲取 hosts 信息 if ( res.containsField( "hosts" ) ){ for ( Object x : (List)res.get("hosts") ){ String host = x.toString(); Node node = _addIfNotHere(host); if (node != null && seenNodes != null) seenNodes.add(node); } } // 獲取 passives 信息 if ( res.containsField( "passives" ) ){ for ( Object x : (List)res.get("passives") ){ String host = x.toString(); Node node = _addIfNotHere(host); if (node != null && seenNodes != null) seenNodes.add(node); } } // 獲取 maxBsonObjectSize if (_isMaster ) { if (res.containsField("maxBsonObjectSize")) maxBsonObjectSize = ((Integer)res.get( "maxBsonObjectSize" )).intValue(); else maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE; } // 獲取 setName if (res.containsField("setName")) { String setName = res.get( "setName" ).toString(); if ( _setName == null ){ _setName = setName; _logger = Logger.getLogger( _rootLogger.getName() + "." + setName ); } else if ( !_setName.equals( setName ) ){ _logger.log( Level.SEVERE , "mis match set name old: " + _setName + " new: " + setName ); return; } } } catch ( ... ){ // ... } } // 更新所有節點狀態 synchronized void updateAll(){ HashSet<Node> seenNodes = new HashSet<Node>(); // 遍歷更新所有節點 for ( int i=0; i<_all.size(); i++ ){ Node n = _all.get(i); n.update(seenNodes); } // 移除已經不存在的節點 if (!seenNodes.isEmpty()) { // not empty, means that at least 1 server gave node list // remove unused hosts Iterator<Node> it = _all.iterator(); while (it.hasNext()) { if (!seenNodes.contains(it.next())) it.remove(); } } }
定時更新節點狀態的內部類 ReplicaSetStatus.Node.Updater
ReplicaSetStatus.Node.Updater 繼承了 Thread,可以實例化一個定時更新節點狀態的線程。
// 覆寫 Thread 類的 run 方法 public void run(){ while ( ! _closed ){ try { // 更新所有節點狀態 updateAll(); // 如果當前時間大于 _nextResolveTime // 則更新所有節點并設置 _nextResolveTime long now = System.currentTimeMillis(); if (inetAddrCacheMS > 0 && _nextResolveTime < now) { _nextResolveTime = now + inetAddrCacheMS; for (Node node : _all) { node.updateAddr(); } } // 檢查 master ,以避免更新帶來的不同步 _mongo.getConnector().checkMaster(true, false); } catch ( Exception e ){ _logger.log( Level.WARNING , "couldn't do update pass" , e ); } // sleep一段時間,等待下次更新 try { Thread.sleep( updaterIntervalMS ); } catch ( InterruptedException ie ){ } } }