MongoDB Java Driver 源碼分析(6):com.mongodb.DBTCPConnector
Posted on 2012-04-15 16:27 zljpp 閱讀(289) 評(píng)論(0) 編輯 收藏
DBTCPConnecror 是對(duì) DBPort 類的封裝,借助 DBPort 實(shí)現(xiàn)讀寫操作、獲取服務(wù)器狀態(tài)等。
DBTCPConnecror 類中比較值得分析的是 say 方法和 call 方法的實(shí)現(xiàn):
這兩個(gè)方法的實(shí)現(xiàn)方式相似,實(shí)際上都是通過(guò) DBPort 實(shí)現(xiàn),同時(shí)增加了檢查連接和錯(cuò)誤處理的代碼。
這里僅以 say 方法的實(shí)現(xiàn)作為例子進(jìn)行說(shuō)明:
say 和 call 方式是借助 DBPort 實(shí)現(xiàn)的,而 DBPort 對(duì)象是通過(guò)內(nèi)部類 DBTCPConnector.MyPort 的 get 方法獲取的:
getServerAddressList (獲取服務(wù)器地址列表),checkMaster(檢查 master 服務(wù)器)和 fetchMaxBsonObjectSize (獲取 maxBsonObjectSize) 也有一定的分析價(jià)值:
say 方法和 call 方法
DBTCPConnecror 類中比較值得分析的是 say 方法和 call 方法的實(shí)現(xiàn):
// 執(zhí)行寫操作 WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded ) // 執(zhí)行讀操作 Response call( DB db , DBCollection coll , OutMessage m , ServerAddress hostNeeded , int retries )
這兩個(gè)方法的實(shí)現(xiàn)方式相似,實(shí)際上都是通過(guò) DBPort 實(shí)現(xiàn),同時(shí)增加了檢查連接和錯(cuò)誤處理的代碼。
這里僅以 say 方法的實(shí)現(xiàn)作為例子進(jìn)行說(shuō)明:
public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded ) throws MongoException { // 檢查數(shù)據(jù)庫(kù)連接 _checkClosed(); checkMaster( false , true ); // 獲取數(shù)據(jù)連接端口 MyPort mp = _myPort.get(); DBPort port = mp.get( true , false , hostNeeded ); try { // 檢查權(quán)限 port.checkAuth( db ); // 通過(guò) DBPort 實(shí)現(xiàn)寫操作 port.say( m ); // 檢查錯(cuò)誤或返回正確的結(jié)果 if ( concern.callGetLastError() ){ return _checkWriteError( db , mp , port , concern ); } else { return new WriteResult( db , port , concern ); } } catch (...){ // ... } finally { // 結(jié)束操作 mp.done( port ); m.doneWithMessage(); } }
say 和 call 方式是借助 DBPort 實(shí)現(xiàn)的,而 DBPort 對(duì)象是通過(guò)內(nèi)部類 DBTCPConnector.MyPort 的 get 方法獲取的:
// 獲取數(shù)據(jù)庫(kù)端口 DBPort get( boolean keep , boolean slaveOk , ServerAddress hostNeeded ){ // 如果指定了服務(wù)器,就獲取指定服務(wù)器的端口 if ( hostNeeded != null ){ // asked for a specific host return _portHolder.get(hostNeeded ).get(); } // 在一個(gè)請(qǐng)求中,如果已經(jīng)使用了一個(gè)端口,則繼續(xù)使用它 if ( _requestPort != null ){ if ( _requestPort.getPool() == _masterPortPool || !keep ) { return _requestPort; } _requestPort.getPool().done(_requestPort); _requestPort = null; } // 集群部署,可以從 Slave 獲取端口 if ( slaveOk && _rsStatus != null ){ // if slaveOk, try to use a secondary ServerAddress slave = _rsStatus.getASecondary(); if ( slave != null ){ return _portHolder.get( slave ).get(); } } // master 端口池為空,拋異常 if (_masterPortPool == null) { throw new MongoException("Rare case where master=null, probably all servers are down"); } // 通過(guò) master 端口池獲取端口 DBPort p = _masterPortPool.get(); if ( keep && _inRequest ) { _requestPort = p; } return p; }
其他方法
getServerAddressList (獲取服務(wù)器地址列表),checkMaster(檢查 master 服務(wù)器)和 fetchMaxBsonObjectSize (獲取 maxBsonObjectSize) 也有一定的分析價(jià)值:
// 獲取服務(wù)器地址列表 public List<ServerAddress> getServerAddressList() { // 如果是集群方式,則通過(guò) ReplicaSetStatus 返回地址列表 if (_rsStatus != null) { return _rsStatus.getServerAddressList(); } // 否則,返回 master 服務(wù)器的地址 ServerAddress master = getAddress(); if (master != null) { List<ServerAddress> list = new ArrayList<ServerAddress>(); list.add(master); return list; } return null; } // 檢查 master 服務(wù)器 void checkMaster( boolean force , boolean failIfNoMaster ) throws MongoException { // 檢查是集群部署還是單機(jī)部署 if ( _rsStatus != null ){ if ( _masterPortPool == null || force ){ // 集群部署 // 通過(guò) ReplicaSetStatus 獲取 master 節(jié)點(diǎn) ReplicaSetStatus.Node n = _rsStatus.ensureMaster(); if ( n == null ){ if ( failIfNoMaster ) throw new MongoException( "can't find a master" ); } else { // 根據(jù) master 節(jié)點(diǎn)的信息設(shè)置 DBTCPConnector 的屬性 _set( n._addr ); maxBsonObjectSize = _rsStatus.getMaxBsonObjectSize(); } } } else { // 單機(jī)部署 // 根據(jù)服務(wù)器信息設(shè)置 maxBsonObjectSize if (maxBsonObjectSize == 0) maxBsonObjectSize = fetchMaxBsonObjectSize(); } } // 獲取并設(shè)置 maxBsonObjectSize int fetchMaxBsonObjectSize() { if (_masterPortPool == null) return 0; DBPort port = _masterPortPool.get(); try { // 連接 admin 數(shù)據(jù)庫(kù),執(zhí)行檢查 master 服務(wù)器的命令 CommandResult res = port.runCommand(_mongo.getDB("admin"), new BasicDBObject("isMaster", 1)); // 1.8 之后的版本返回的結(jié)果中包含 maxBsonObjectSize if (res.containsField("maxBsonObjectSize")) { maxBsonObjectSize = ((Integer) res.get("maxBsonObjectSize")).intValue(); } else { maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE; } } catch (Exception e) { _logger.log(Level.WARNING, null, e); } finally { port.getPool().done(port); } return maxBsonObjectSize; }