MongoDB Java Driver 源碼分析(6):com.mongodb.DBTCPConnector
Posted on 2012-04-15 16:27 zljpp 閱讀(289) 評論(0) 編輯 收藏
DBTCPConnecror 是對 DBPort 類的封裝,借助 DBPort 實現讀寫操作、獲取服務器狀態等。
DBTCPConnecror 類中比較值得分析的是 say 方法和 call 方法的實現:
這兩個方法的實現方式相似,實際上都是通過 DBPort 實現,同時增加了檢查連接和錯誤處理的代碼。
這里僅以 say 方法的實現作為例子進行說明:
say 和 call 方式是借助 DBPort 實現的,而 DBPort 對象是通過內部類 DBTCPConnector.MyPort 的 get 方法獲取的:
getServerAddressList (獲取服務器地址列表),checkMaster(檢查 master 服務器)和 fetchMaxBsonObjectSize (獲取 maxBsonObjectSize) 也有一定的分析價值:
say 方法和 call 方法
DBTCPConnecror 類中比較值得分析的是 say 方法和 call 方法的實現:
// 執行寫操作 WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded ) // 執行讀操作 Response call( DB db , DBCollection coll , OutMessage m , ServerAddress hostNeeded , int retries )
這兩個方法的實現方式相似,實際上都是通過 DBPort 實現,同時增加了檢查連接和錯誤處理的代碼。
這里僅以 say 方法的實現作為例子進行說明:
public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded ) throws MongoException { // 檢查數據庫連接 _checkClosed(); checkMaster( false , true ); // 獲取數據連接端口 MyPort mp = _myPort.get(); DBPort port = mp.get( true , false , hostNeeded ); try { // 檢查權限 port.checkAuth( db ); // 通過 DBPort 實現寫操作 port.say( m ); // 檢查錯誤或返回正確的結果 if ( concern.callGetLastError() ){ return _checkWriteError( db , mp , port , concern ); } else { return new WriteResult( db , port , concern ); } } catch (...){ // ... } finally { // 結束操作 mp.done( port ); m.doneWithMessage(); } }
say 和 call 方式是借助 DBPort 實現的,而 DBPort 對象是通過內部類 DBTCPConnector.MyPort 的 get 方法獲取的:
// 獲取數據庫端口 DBPort get( boolean keep , boolean slaveOk , ServerAddress hostNeeded ){ // 如果指定了服務器,就獲取指定服務器的端口 if ( hostNeeded != null ){ // asked for a specific host return _portHolder.get(hostNeeded ).get(); } // 在一個請求中,如果已經使用了一個端口,則繼續使用它 if ( _requestPort != null ){ if ( _requestPort.getPool() == _masterPortPool || !keep ) { return _requestPort; } _requestPort.getPool().done(_requestPort); _requestPort = null; } // 集群部署,可以從 Slave 獲取端口 if ( slaveOk && _rsStatus != null ){ // if slaveOk, try to use a secondary ServerAddress slave = _rsStatus.getASecondary(); if ( slave != null ){ return _portHolder.get( slave ).get(); } } // master 端口池為空,拋異常 if (_masterPortPool == null) { throw new MongoException("Rare case where master=null, probably all servers are down"); } // 通過 master 端口池獲取端口 DBPort p = _masterPortPool.get(); if ( keep && _inRequest ) { _requestPort = p; } return p; }
其他方法
getServerAddressList (獲取服務器地址列表),checkMaster(檢查 master 服務器)和 fetchMaxBsonObjectSize (獲取 maxBsonObjectSize) 也有一定的分析價值:
// 獲取服務器地址列表 public List<ServerAddress> getServerAddressList() { // 如果是集群方式,則通過 ReplicaSetStatus 返回地址列表 if (_rsStatus != null) { return _rsStatus.getServerAddressList(); } // 否則,返回 master 服務器的地址 ServerAddress master = getAddress(); if (master != null) { List<ServerAddress> list = new ArrayList<ServerAddress>(); list.add(master); return list; } return null; } // 檢查 master 服務器 void checkMaster( boolean force , boolean failIfNoMaster ) throws MongoException { // 檢查是集群部署還是單機部署 if ( _rsStatus != null ){ if ( _masterPortPool == null || force ){ // 集群部署 // 通過 ReplicaSetStatus 獲取 master 節點 ReplicaSetStatus.Node n = _rsStatus.ensureMaster(); if ( n == null ){ if ( failIfNoMaster ) throw new MongoException( "can't find a master" ); } else { // 根據 master 節點的信息設置 DBTCPConnector 的屬性 _set( n._addr ); maxBsonObjectSize = _rsStatus.getMaxBsonObjectSize(); } } } else { // 單機部署 // 根據服務器信息設置 maxBsonObjectSize if (maxBsonObjectSize == 0) maxBsonObjectSize = fetchMaxBsonObjectSize(); } } // 獲取并設置 maxBsonObjectSize int fetchMaxBsonObjectSize() { if (_masterPortPool == null) return 0; DBPort port = _masterPortPool.get(); try { // 連接 admin 數據庫,執行檢查 master 服務器的命令 CommandResult res = port.runCommand(_mongo.getDB("admin"), new BasicDBObject("isMaster", 1)); // 1.8 之后的版本返回的結果中包含 maxBsonObjectSize if (res.containsField("maxBsonObjectSize")) { maxBsonObjectSize = ((Integer) res.get("maxBsonObjectSize")).intValue(); } else { maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE; } } catch (Exception e) { _logger.log(Level.WARNING, null, e); } finally { port.getPool().done(port); } return maxBsonObjectSize; }