DBPort 是表示數據庫端口的類,分別用 call 和 say 方法實現讀取和寫入操作。
這兩個方法都調用了 go 方法。
DBProt 的 go 方法調用了 OutMessage 的 prepare、pipe 等方法,實際上這些方法又是間接地通過 PoolOutputBuffer 實現的,這將在后面的文章中提到。
另外 DBPort 的 open 方法用于打開數據連接:
這兩個方法都調用了 go 方法。
// 讀取操作 Response call( OutMessage msg , DBCollection coll ) throws IOException { return go( msg , coll ); } // 寫入操作 void say( OutMessage msg ) throws IOException { go( msg , null ); } // 執行操作 private synchronized Response go( OutMessage msg , DBCollection coll ) throws IOException { return go( msg , coll , false ); } // 執行操作 private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse ) throws IOException { // 正在處理請求 if ( _processingResponse ){ if ( coll == null ){ // this could be a pipeline and should be safe } else { // this could cause issues since we're reading data off the wire throw new IllegalStateException( "DBPort.go called and expecting a response while processing another response" ); } } // 增加調用次數計數 _calls++; // _sorket 為空,打開連接 if ( _socket == null ) _open(); if ( _out == null ) throw new IllegalStateException( "_out shouldn't be null" ); try { // 準備消息 msg.prepare(); // 輸出 msg.pipe( _out ); if ( _pool != null ) _pool._everWorked = true; if ( coll == null && ! forceReponse ) return null; _processingResponse = true; // 返回結果 return new Response( _sa , coll , _in , _decoder); } catch ( IOException ioe ){ close(); throw ioe; } finally { _processingResponse = false; } }
DBProt 的 go 方法調用了 OutMessage 的 prepare、pipe 等方法,實際上這些方法又是間接地通過 PoolOutputBuffer 實現的,這將在后面的文章中提到。
另外 DBPort 的 open 方法用于打開數據連接:
// 打開連接 boolean _open() throws IOException { long sleepTime = 100; final long start = System.currentTimeMillis(); while ( true ){ IOException lastError = null; try { // 創建 socket 并連接 _socket = new Socket(); _socket.connect( _addr , _options.connectTimeout ); // 設置 socket 參數 _socket.setTcpNoDelay( ! USE_NAGLE ); _socket.setKeepAlive( _options.socketKeepAlive ); _socket.setSoTimeout( _options.socketTimeout ); // 獲取輸入輸出流 _in = new BufferedInputStream( _socket.getInputStream() ); _out = _socket.getOutputStream(); return true; } catch ( IOException ioe ){ // ... } if ( ! _options.autoConnectRetry || ( _pool != null && ! _pool._everWorked ) ) throw lastError; // 超時處理 long sleptSoFar = System.currentTimeMillis() - start; if ( sleptSoFar >= CONN_RETRY_TIME_MS ) throw lastError; if ( sleepTime + sleptSoFar > CONN_RETRY_TIME_MS ) sleepTime = CONN_RETRY_TIME_MS - sleptSoFar; // 等待重試 _logger.severe( "going to sleep and retry. total sleep time after = " + ( sleptSoFar + sleptSoFar ) + "ms this time:" + sleepTime + "ms" ); ThreadUtil.sleep( sleepTime ); sleepTime *= 2; } }