由于Dubbo底層采用Socket進(jìn)行通信,自己對(duì)通信理理論也不是很清楚,所以順便把通信的知識(shí)也學(xué)習(xí)一下。
n 通信理論
計(jì)算機(jī)與外界的信息交換稱(chēng)為通信。基本的通信方法有并行通信和串行通信兩種。
1.一組信息(通常是字節(jié))的各位數(shù)據(jù)被同時(shí)傳送的通信方法稱(chēng)為并行通信。并行通信依靠并行I/O接口實(shí)現(xiàn)。并行通信速度快,但傳輸線(xiàn)根數(shù)多,只適用于近距離(相距數(shù)公尺)的通信。
2.一組信息的各位數(shù)據(jù)被逐位順序傳送的通信方式稱(chēng)為串行通信。串行通信可通過(guò)串行接口來(lái)實(shí)現(xiàn)。串行通信速度慢,但傳輸線(xiàn)少,適宜長(zhǎng)距離通信。
串行通信按信息傳送方向分為以下3種:
1) 單工
只能一個(gè)方向傳輸數(shù)據(jù)

2) 半雙工
信息能雙向傳輸,但不能同時(shí)雙向傳輸

3) 全雙工
能雙向傳輸并且可以同時(shí)雙向傳輸

n Socket
Socket 是一種應(yīng)用接口, TCP/IP 是網(wǎng)絡(luò)傳輸協(xié)議,雖然接口相同, 但是不同的協(xié)議會(huì)有不同的服務(wù)性質(zhì)。創(chuàng)建Socket 連接時(shí),可以指定使用的傳輸層協(xié)議,Socket 可以支持不同的傳輸層協(xié)議(TCP 或UDP ),當(dāng)使用TCP 協(xié)議進(jìn)行連接時(shí),該Socket 連接就是一個(gè)TCP 連接。Soket 跟TCP/IP 并沒(méi)有必然的聯(lián)系。Socket 編程接口在設(shè)計(jì)的時(shí)候,就希望也能適應(yīng)其他的網(wǎng)絡(luò)協(xié)議。所以,socket 的出現(xiàn)只是可以更方便的使用TCP/IP 協(xié)議棧而已。
引自:http://hi.baidu.com/lewutian/blog/item/b28e27fd446d641d09244d08.html
上一個(gè)通信理論其實(shí)是想說(shuō)Socket(TCP)通信是全雙工的方式
n Dubbo遠(yuǎn)程同步調(diào)用原理分析
從Dubbo開(kāi)源文檔上了解到一個(gè)調(diào)用過(guò)程如下圖
http://code.alibabatech.com/wiki/display/dubbo/User+Guide#UserGuide-APIReference
另外文檔里有說(shuō)明:Dubbo缺省協(xié)議采用單一長(zhǎng)連接和NIO異步通訊,適合于小數(shù)據(jù)量大并發(fā)的服務(wù)調(diào)用,以及服務(wù)消費(fèi)者機(jī)器數(shù)遠(yuǎn)大于服務(wù)提供者機(jī)器數(shù)的情況。

Dubbo缺省協(xié)議,使用基于mina1.1.7+hessian3.2.1的tbremoting交互。
- 連接個(gè)數(shù):?jiǎn)芜B接
- 連接方式:長(zhǎng)連接
- 傳輸協(xié)議:TCP
- 傳輸方式:NIO異步傳輸
- 序列化:Hessian二進(jìn)制序列化
- 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包較小(建議小于100K),消費(fèi)者比提供者個(gè)數(shù)多,單一消費(fèi)者無(wú)法壓滿(mǎn)提供者,盡量不要用dubbo協(xié)議傳輸大文件或超大字符串。
- 適用場(chǎng)景:常規(guī)遠(yuǎn)程服務(wù)方法調(diào)用
通常,一個(gè)典型的同步遠(yuǎn)程調(diào)用應(yīng)該是這樣的:

1, 客戶(hù)端線(xiàn)程調(diào)用遠(yuǎn)程接口,向服務(wù)端發(fā)送請(qǐng)求,同時(shí)當(dāng)前線(xiàn)程應(yīng)該處于“暫停“狀態(tài),即線(xiàn)程不能向后執(zhí)行了,必需要拿到服務(wù)端給自己的結(jié)果后才能向后執(zhí)行
- 當(dāng)前線(xiàn)程怎么讓它“暫停”,等結(jié)果回來(lái)后,再向后執(zhí)行?
- 正如前面所說(shuō),Socket通信是一個(gè)全雙工的方式,如果有多個(gè)線(xiàn)程同時(shí)進(jìn)行遠(yuǎn)程方法調(diào)用,這時(shí)建立在client server之間的socket連接上會(huì)有很多雙方發(fā)送的消息傳遞,前后順序也可能是亂七八糟的,server處理完結(jié)果后,將結(jié)果消息發(fā)送給client,client收到很多消息,怎么知道哪個(gè)消息結(jié)果是原先哪個(gè)線(xiàn)程調(diào)用的?
- client一個(gè)線(xiàn)程調(diào)用遠(yuǎn)程接口,生成一個(gè)唯一的ID(比如一段隨機(jī)字符串,UUID等),Dubbo是使用AtomicLong從0開(kāi)始累計(jì)數(shù)字的
- 將打包的方法調(diào)用信息(如調(diào)用的接口名稱(chēng),方法名稱(chēng),參數(shù)值列表等),和處理結(jié)果的回調(diào)對(duì)象callback,全部封裝在一起,組成一個(gè)對(duì)象object
- 向?qū)iT(mén)存放調(diào)用信息的全局ConcurrentHashMap里面put(ID, object)
- 將ID和打包的方法調(diào)用信息封裝成一對(duì)象connRequest,使用IoSession.write(connRequest)異步發(fā)送出去
- 當(dāng)前線(xiàn)程再使用callback的get()方法試圖獲取遠(yuǎn)程返回的結(jié)果,在get()內(nèi)部,則使用synchronized獲取回調(diào)對(duì)象callback的鎖, 再先檢測(cè)是否已經(jīng)獲取到結(jié)果,如果沒(méi)有,然后調(diào)用callback的wait()方法,釋放callback上的鎖,讓當(dāng)前線(xiàn)程處于等待狀態(tài)。
- 服務(wù)端接收到請(qǐng)求并處理后,將結(jié)果(此結(jié)果中包含了前面的ID,即回傳)發(fā)送給客戶(hù)端,客戶(hù)端socket連接上專(zhuān)門(mén)監(jiān)聽(tīng)消息的線(xiàn)程收到消息,分析結(jié)果,取到ID,再?gòu)那懊娴腃oncurrentHashMap里面get(ID),從而找到callback,將方法調(diào)用結(jié)果設(shè)置到callback對(duì)象里。
- 監(jiān)聽(tīng)線(xiàn)程接著使用synchronized獲取回調(diào)對(duì)象callback的鎖(因?yàn)榍懊嬲{(diào)用過(guò)wait(),那個(gè)線(xiàn)程已釋放callback的鎖了),再notifyAll(),喚醒前面處于等待狀態(tài)的線(xiàn)程繼續(xù)執(zhí)行(callback的get()方法繼續(xù)執(zhí)行就能拿到調(diào)用結(jié)果了),至此,整個(gè)過(guò)程結(jié)束。
- 當(dāng)前線(xiàn)程怎么讓它“暫停”,等結(jié)果回來(lái)后,再向后執(zhí)行?
- 正如前面所說(shuō),Socket通信是一個(gè)全雙工的方式,如果有多個(gè)線(xiàn)程同時(shí)進(jìn)行遠(yuǎn)程方法調(diào)用,這時(shí)建立在client server之間的socket連接上會(huì)有很多雙方發(fā)送的消息傳遞,前后順序也可能是亂七八糟的,server處理完結(jié)果后,將結(jié)果消息發(fā)送給client,client收到很多消息,怎么知道哪個(gè)消息結(jié)果是原先哪個(gè)線(xiàn)程調(diào)用的?
關(guān)鍵代碼:
com.taobao.remoting.impl.DefaultClient.java //同步調(diào)用遠(yuǎn)程接口 public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException { byte protocol = getProtocol(control); if (!TRConstants.isValidProtocol(protocol)) { throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync."); } ResponseFuture future = invokeWithFuture(appRequest, control); return future.get(); //獲取結(jié)果時(shí)讓當(dāng)前線(xiàn)程等待,ResponseFuture其實(shí)就是前面說(shuō)的callback } public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) { byte protocol = getProtocol(control); long timeout = getTimeout(control); ConnectionRequest request = new ConnectionRequest(appRequest); request.setSerializeProtocol(protocol); Callback2FutureAdapter adapter = new Callback2FutureAdapter(request); connection.sendRequestWithCallback(request, adapter, timeout); return adapter; } |
Callback2FutureAdapter implements ResponseFuture public Object get() throws RemotingException, InterruptedException { synchronized (this) { // 旋鎖 while (!isDone) { // 是否有結(jié)果了 wait(); //沒(méi)結(jié)果是釋放鎖,讓當(dāng)前線(xiàn)程處于等待狀態(tài) } } if (errorCode == TRConstants.RESULT_TIMEOUT) { throw new TimeoutException("Wait response timeout, request[" + connectionRequest.getAppRequest() + "]."); } else if (errorCode > 0) { throw new RemotingException(errorMsg); } else { return appResp; } } 客戶(hù)端收到服務(wù)端結(jié)果后,回調(diào)時(shí)相關(guān)方法,即設(shè)置isDone = true并notifyAll() public void handleResponse(Object _appResponse) { appResp = _appResponse; //將遠(yuǎn)程調(diào)用結(jié)果設(shè)置到callback中來(lái) setDone(); } public void onRemotingException(int _errorType, String _errorMsg) { errorCode = _errorType; errorMsg = _errorMsg; setDone(); } private void setDone() { isDone = true; synchronized (this) { //獲取鎖,因?yàn)榍懊鎤ait()已經(jīng)釋放了callback的鎖了 notifyAll(); // 喚醒處于等待的線(xiàn)程 } } |
com.taobao.remoting.impl.DefaultConnection.java
// 用來(lái)存放請(qǐng)求和回調(diào)的MAP private final ConcurrentHashMap<Long, Object[]> requestResidents;
//發(fā)送消息出去 void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) { long requestId = connRequest.getId(); long waitBegin = System.currentTimeMillis(); long waitEnd = waitBegin + timeoutMs; Object[] queue = new Object[4]; int idx = 0; queue[idx++] = waitEnd; queue[idx++] = waitBegin; //用于記錄日志 queue[idx++] = connRequest; //用于記錄日志 queue[idx++] = callback; requestResidents.put(requestId, queue); // 記錄響應(yīng)隊(duì)列 write(connRequest);
// 埋點(diǎn)記錄等待響應(yīng)的Map的大小 StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(), 1L); } public void write(final Object connectionMsg) { //mina里的IoSession.write()發(fā)送消息 WriteFuture writeFuture = ioSession.write(connectionMsg); // 注冊(cè)FutureListener,當(dāng)請(qǐng)求發(fā)送失敗后,能夠立即做出響應(yīng) writeFuture.addListener(new MsgWrittenListener(this, connectionMsg)); }
/** * 在得到響應(yīng)后,刪除對(duì)應(yīng)的請(qǐng)求隊(duì)列,并執(zhí)行回調(diào) * 調(diào)用者:MINA線(xiàn)程 */ public void putResponse(final ConnectionResponse connResp) { final long requestId = connResp.getRequestId(); Object[] queue = requestResidents.remove(requestId); if (null == queue) { Object appResp = connResp.getAppResponse(); String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName(); StringBuilder sb = new StringBuilder(); sb.append("Not found response receiver for requestId=[").append(requestId).append("],"); sb.append("from [").append(connResp.getHost()).append("],"); sb.append("response type [").append(appRespClazz).append("]."); LOGGER.warn(sb.toString()); return; } int idx = 0; idx++; long waitBegin = (Long) queue[idx++]; ConnectionRequest connRequest = (ConnectionRequest) queue[idx++]; ResponseCallback callback = (ResponseCallback) queue[idx++]; // ** 把回調(diào)任務(wù)交給業(yè)務(wù)提供的線(xiàn)程池執(zhí)行 ** Executor callbackExecutor = callback.getExecutor(); callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));
long duration = System.currentTimeMillis() - waitBegin; // 實(shí)際讀響應(yīng)時(shí)間 logIfResponseError(connResp, duration, connRequest.getAppRequest()); } |
CallbackExecutorTask static private class CallbackExecutorTask implements Runnable { final ConnectionResponse resp; final ResponseCallback callback; final Thread createThread;
CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) { resp = _resp; callback = _cb; createThread = Thread.currentThread(); }
public void run() { // 預(yù)防這種情況:業(yè)務(wù)提供的Executor,讓調(diào)用者線(xiàn)程來(lái)執(zhí)行任務(wù) if (createThread == Thread.currentThread() && callback.getExecutor() != DIYExecutor.getInstance()) { StringBuilder sb = new StringBuilder(); sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:"); sb.append("Can not callback task on the network io thhread."); LOGGER.warn(sb.toString()); return; }
if (TRConstants.RESULT_SUCCESS == resp.getResult()) { callback.handleResponse(resp.getAppResponse()); //設(shè)置調(diào)用結(jié)果 } else { callback.onRemotingException(resp.getResult(), resp .getErrorMsg()); //處理調(diào)用異常 } } } |
另外:
1, 服務(wù)端在處理客戶(hù)端的消息,然后再處理時(shí),使用了線(xiàn)程池來(lái)并行處理,不用一個(gè)一個(gè)消息的處理
同樣,客戶(hù)端接收到服務(wù)端的消息,也是使用線(xiàn)程池來(lái)處理消息,再回調(diào)
轉(zhuǎn)載自:http://sunjun041640.blog.163.com/blog/static/256268322011111882453405/