Jack Jiang

          我的最新工程MobileIMSDK:http://git.oschina.net/jackjiang/MobileIMSDK
          posts - 499, comments - 13, trackbacks - 0, articles - 1

          本文作者芋艿,原題“使用 Netty 實(shí)現(xiàn) IM 聊天賊簡(jiǎn)單”,本底價(jià)有修訂和改動(dòng)。

          一、本文引言

          上篇《跟著源碼學(xué)IM(七):手把手教你用WebSocket打造Web端IM聊天》中,我們使用 WebSocket 實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的 IM 功能,支持身份認(rèn)證、私聊消息、群聊消息。

          然后就有人發(fā)私信,希望使用純 Netty 實(shí)現(xiàn)一個(gè)類似的功能,因此就有了本文。

          注:源碼請(qǐng)從同步鏈接附件中下載,http://www.52im.net/thread-3489-1-1.html。

          學(xué)習(xí)交流:

          - 即時(shí)通訊/推送技術(shù)開(kāi)發(fā)交流5群:215477170 [推薦]

          - 移動(dòng)端IM開(kāi)發(fā)入門文章:《新手入門一篇就夠:從零開(kāi)發(fā)移動(dòng)端IM

          - 開(kāi)源IM框架源碼:https://github.com/JackJiang2011/MobileIMSDK

          本文同步發(fā)布于:http://www.52im.net/thread-3489-1-1.html

          二、知識(shí)準(zhǔn)備

          可能有人不知道 Netty 是什么,這里簡(jiǎn)單介紹下:

          Netty 是一個(gè) Java 開(kāi)源框架。Netty 提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開(kāi)發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。

          也就是說(shuō),Netty 是一個(gè)基于 NIO 的客戶、服務(wù)器端編程框架,使用Netty 可以確保你快速和簡(jiǎn)單的開(kāi)發(fā)出一個(gè)網(wǎng)絡(luò)應(yīng)用,例如實(shí)現(xiàn)了某種協(xié)議的客戶,服務(wù)端應(yīng)用。

          Netty 相當(dāng)簡(jiǎn)化和流線化了網(wǎng)絡(luò)應(yīng)用的編程開(kāi)發(fā)過(guò)程,例如,TCP 和 UDP 的 Socket 服務(wù)開(kāi)發(fā)。

          以下是幾篇有關(guān)Netty的入門文章,值得一讀:

          如果你連Java的NIO都不知道是什么,下面的文章建議優(yōu)先讀一下:

          Netty源碼和API的在線閱讀地址:

          三、本文源碼

          本文完整代碼附件下載:請(qǐng)從同步鏈接附件中下載,http://www.52im.net/thread-3489-1-1.html。

          源碼的目錄結(jié)構(gòu),如下圖所示: 

          如上圖所示:

          • 1)lab-67-netty-demo-server 項(xiàng)目:搭建 Netty 服務(wù)端;
          • 2)lab-67-netty-demo-client 項(xiàng)目:搭建 Netty 客戶端;
          • 3)lab-67-netty-demo-common 項(xiàng)目:提供 Netty 的基礎(chǔ)封裝,提供消息的編解碼、分發(fā)的功能。

          另外,源碼中也會(huì)提供 Netty 常用功能的示例:

          • 1)心跳機(jī)制,實(shí)現(xiàn)服務(wù)端對(duì)客戶端的存活檢測(cè);
          • 2)斷線重連,實(shí)現(xiàn)客戶端對(duì)服務(wù)端的重新連接。

          不嗶嗶,直接開(kāi)干。

          五、通信協(xié)議

          在上一章中,我們實(shí)現(xiàn)了客戶端和服務(wù)端的連接功能。而本小節(jié),我們要讓它們兩能夠說(shuō)上話,即進(jìn)行數(shù)據(jù)的讀寫(xiě)。

          在日常項(xiàng)目的開(kāi)發(fā)中,前端和后端之間采用 HTTP 作為通信協(xié)議,使用文本內(nèi)容進(jìn)行交互,數(shù)據(jù)格式一般是 JSON。但是在 TCP 的世界里,我們需要自己基于二進(jìn)制構(gòu)建,構(gòu)建客戶端和服務(wù)端的通信協(xié)議。

          我們以客戶端向服務(wù)端發(fā)送消息來(lái)舉個(gè)例子,假設(shè)客戶端要發(fā)送一個(gè)登錄請(qǐng)求。

          對(duì)應(yīng)的類如下:

          public class AuthRequest {

              /** 用戶名 **/

              private String username;

              /** 密碼 **/

              private String password;

          }

          顯然:我們無(wú)法將一個(gè) Java 對(duì)象直接丟到 TCP Socket 當(dāng)中,而是需要將其轉(zhuǎn)換成 byte 字節(jié)數(shù)組,才能寫(xiě)入到 TCP Socket 中去。即,需要將消息對(duì)象通過(guò)序列化,轉(zhuǎn)換成 byte 字節(jié)數(shù)組。

          同時(shí):在服務(wù)端收到 byte 字節(jié)數(shù)組時(shí),需要將其又轉(zhuǎn)換成 Java 對(duì)象,即反序列化。不然,服務(wù)端對(duì)著一串 byte 字節(jié)處理個(gè)毛線?!

          友情提示:服務(wù)端向客戶端發(fā)消息,也是一樣的過(guò)程哈!

          序列化的工具非常多,例如說(shuō) Google 提供的 Protobuf,性能高效,且序列化出來(lái)的二進(jìn)制數(shù)據(jù)較小。Netty 對(duì) Protobuf 進(jìn)行集成,提供了相應(yīng)的編解碼器。

          如下圖所示: 

          但是考慮到很多可能對(duì) Protobuf 并不了解,因?yàn)樗鼘?shí)現(xiàn)序列化又增加額外學(xué)習(xí)成本。因此,仔細(xì)一個(gè)捉摸,還是采用 JSON 方式進(jìn)行序列化。可能有人會(huì)疑惑,JSON 不是將對(duì)象轉(zhuǎn)換成字符串嗎?嘿嘿,我們?cè)侔炎址D(zhuǎn)換成 byte 字節(jié)數(shù)組就可以啦~

          下面,我們新建 lab-67-netty-demo-common 項(xiàng)目,并在 codec 包下,實(shí)現(xiàn)我們自定義的通信協(xié)議。

          如下圖所示:

          5.1、Invocation

          創(chuàng)建 Invocation 類,通信協(xié)議的消息體。

          代碼如下:

          /**

           * 通信協(xié)議的消息體

           */

          public class Invocation {

              /**

               * 類型

               */

              private String type;

              /**

               * 消息,JSON 格式

               */

              private String message;

           

              // 空構(gòu)造方法

              public Invocation() {

              }

           

              public Invocation(String type, String message) {

                  this.type = type;

                  this.message = message;

              }

           

              public Invocation(String type, Message message) {

                  this.type = type;

                  this.message = JSON.toJSONString(message);

              }

           

              // ... 省略 setter、getter、toString 方法

          }

           type 屬性,類型,用于匹配對(duì)應(yīng)的消息處理器。如果類比 HTTP 協(xié)議,type 屬性相當(dāng)于請(qǐng)求地址。

           message 屬性,消息內(nèi)容,使用 JSON 格式。

          另外,Message 是我們定義的消息接口,代碼如下:

          public interface Message {

              // ... 空,作為標(biāo)記接口

          }

          5.2、粘包與拆包

          在開(kāi)始看 Invocation 的編解碼處理器之前,我們先了解下粘包與拆包的概念。

          5.2.1 產(chǎn)生原因

          產(chǎn)生粘包和拆包問(wèn)題的主要原因是,操作系統(tǒng)在發(fā)送 TCP 數(shù)據(jù)的時(shí)候,底層會(huì)有一個(gè)緩沖區(qū),例如 1024 個(gè)字節(jié)大小。

          如果一次請(qǐng)求發(fā)送的數(shù)據(jù)量比較小,沒(méi)達(dá)到緩沖區(qū)大小,TCP 則會(huì)將多個(gè)請(qǐng)求合并為同一個(gè)請(qǐng)求進(jìn)行發(fā)送,這就形成了粘包問(wèn)題。

          例如說(shuō):在《詳解 Socket 編程 --- TCP_NODELAY 選項(xiàng)》文章中我們可以看到,在關(guān)閉 Nagle 算法時(shí),請(qǐng)求不會(huì)等待滿足緩沖區(qū)大小,而是盡快發(fā)出,降低延遲。

          如果一次請(qǐng)求發(fā)送的數(shù)據(jù)量比較大,超過(guò)了緩沖區(qū)大小,TCP 就會(huì)將其拆分為多次發(fā)送,這就是拆包,也就是將一個(gè)大的包拆分為多個(gè)小包進(jìn)行發(fā)送。

          如下圖展示了粘包和拆包的一個(gè)示意圖,演示了粘包和拆包的三種情況: 

          如上圖所示:

          • 1)A 和 B 兩個(gè)包都剛好滿足 TCP 緩沖區(qū)的大小,或者說(shuō)其等待時(shí)間已經(jīng)達(dá)到 TCP 等待時(shí)長(zhǎng),從而還是使用兩個(gè)獨(dú)立的包進(jìn)行發(fā)送;
          • 2)A 和 B 兩次請(qǐng)求間隔時(shí)間內(nèi)較短,并且數(shù)據(jù)包較小,因而合并為同一個(gè)包發(fā)送給服務(wù)端;
          • 3)B 包比較大,因而將其拆分為兩個(gè)包 B_1 和 B_2 進(jìn)行發(fā)送,而這里由于拆分后的 B_2 比較小,其又與 A 包合并在一起發(fā)送。

          5.2.2 解決方案

          對(duì)于粘包和拆包問(wèn)題,常見(jiàn)的解決方案有三種。

           客戶端在發(fā)送數(shù)據(jù)包的時(shí)候,每個(gè)包都固定長(zhǎng)度。比如 1024 個(gè)字節(jié)大小,如果客戶端發(fā)送的數(shù)據(jù)長(zhǎng)度不足 1024 個(gè)字節(jié),則通過(guò)補(bǔ)充空格的方式補(bǔ)全到指定長(zhǎng)度。

          這種方式,暫時(shí)沒(méi)有找到采用這種方式的案例。

           客戶端在每個(gè)包的末尾使用固定的分隔符。例如 \r\n,如果一個(gè)包被拆分了,則等待下一個(gè)包發(fā)送過(guò)來(lái)之后找到其中的 \r\n,然后對(duì)其拆分后的頭部部分與前一個(gè)包的剩余部分進(jìn)行合并,這樣就得到了一個(gè)完整的包。具體的案例,有 HTTP、WebSocket、Redis。

           將消息分為頭部和消息體,在頭部中保存有當(dāng)前整個(gè)消息的長(zhǎng)度,只有在讀取到足夠長(zhǎng)度的消息之后才算是讀到了一個(gè)完整的消息。

          友情提示:方案 ③ 是 ① 的升級(jí)版,動(dòng)態(tài)長(zhǎng)度。

          本文將采用這種方式,在每次 Invocation 序列化成字節(jié)數(shù)組寫(xiě)入 TCP Socket 之前,先將字節(jié)數(shù)組的長(zhǎng)度寫(xiě)到其中。

          如下圖所示: 

          5.3、InvocationEncoder

          創(chuàng)建 InvocationEncoder 類,實(shí)現(xiàn)將 Invocation 序列化,并寫(xiě)入到 TCP Socket 中。

          代碼如下:

          public class InvocationEncoder extends MessageToByteEncoder<Invocation> {

           

              private Logger logger = LoggerFactory.getLogger(getClass());

           

              @Override

              protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {

                  // <2.1> 將 Invocation 轉(zhuǎn)換成 byte[] 數(shù)組

                  byte[] content = JSON.toJSONBytes(invocation);

                  // <2.2> 寫(xiě)入 length

                  out.writeInt(content.length);

                  // <2.3> 寫(xiě)入內(nèi)容

                  out.writeBytes(content);

                  logger.info("[encode][連接({}) 編碼了一條消息({})]", ctx.channel().id(), invocation.toString());

              }

          }

           MessageToByteEncoder 是 Netty 定義的編碼 ChannelHandler 抽象類,將泛型 消息轉(zhuǎn)換成字節(jié)數(shù)組。

           #encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) 方法,進(jìn)行編碼的邏輯。

          <2.1> 處,調(diào)用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features) 方法,將 Invocation 轉(zhuǎn)換成 字節(jié)數(shù)組。

          <2.2> 處,將字節(jié)數(shù)組的長(zhǎng)度,寫(xiě)入到 TCP Socket 當(dāng)中。這樣,后續(xù)「5.4 InvocationDecoder」可以根據(jù)該長(zhǎng)度,解析到消息,解決粘包和拆包的問(wèn)題。

          友情提示:MessageToByteEncoder 會(huì)最終將 ByteBuf out 寫(xiě)到 TCP Socket 中。

          <2.3> 處,將字節(jié)數(shù)組,寫(xiě)入到 TCP Socket 當(dāng)中。

          5.4、InvocationDecoder

          創(chuàng)建 InvocationDecoder 類,實(shí)現(xiàn)從 TCP Socket 讀取字節(jié)數(shù)組,反序列化成 Invocation。

          代碼如下: 

           ByteToMessageDecoder 是 Netty 定義的解碼 ChannelHandler 抽象類,在 TCP Socket 讀取到新數(shù)據(jù)時(shí),觸發(fā)進(jìn)行解碼。

           在 <2.1>、<2.2>、<2.3> 處,從 TCP Socket 中讀取長(zhǎng)度。

           在 <3.1>、<3.2>、<3.3> 處,從 TCP Socket 中讀取字節(jié)數(shù)組,并反序列化成 Invocation 對(duì)象。

          最終,添加 List<Object> out 中,交給后續(xù)的 ChannelHandler 進(jìn)行處理。稍后,我們將在「6. 消息分發(fā)」小結(jié)中,會(huì)看到 MessageDispatcher 將 Invocation 分發(fā)到其對(duì)應(yīng)的 MessageHandler 中,進(jìn)行業(yè)務(wù)邏輯的執(zhí)行。

          5.5、引入依賴

          創(chuàng)建 pom.xml 文件,引入 Netty、FastJSON 等等依賴。

          5.6、本章小結(jié)

          至此,我們已經(jīng)完成通信協(xié)議的定義、編解碼的邏輯,是不是蠻有趣的?!

          另外,我們?cè)?NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代碼中,將編解碼器添加到其中。

          如下圖所示: 

          六、消息分發(fā)

          在 SpringMVC 中,DispatcherServlet 會(huì)根據(jù)請(qǐng)求地址、方法等,將請(qǐng)求分發(fā)到匹配的 Controller 的 Method 方法上。

          在 lab-67-netty-demo-client 項(xiàng)目的 dispatcher 包中,我們創(chuàng)建了 MessageDispatcher 類,實(shí)現(xiàn)和 DispatcherServlet 類似的功能,將 Invocation 分發(fā)到其對(duì)應(yīng)的 MessageHandler 中,進(jìn)行業(yè)務(wù)邏輯的執(zhí)行。 

          下面,我們來(lái)看看具體的代碼實(shí)現(xiàn)。

          6.1、Message

          創(chuàng)建 Message 接口,定義消息的標(biāo)記接口。

          代碼如下:

          public interface Message {

          }

          下圖,是我們涉及到的 Message 實(shí)現(xiàn)類。

          如下圖所示:

          6.2、MessageHandler

          創(chuàng)建 MessageHandler 接口,消息處理器接口。

          代碼如下:

          public interface MessageHandler<T extendsMessage> {

              /**

               * 執(zhí)行處理消息

               *

               * @param channel 通道

               * @param message 消息

               */

              voide xecute(Channel channel, T message);

           

              /**

               * @return 消息類型,即每個(gè) Message 實(shí)現(xiàn)類上的 TYPE 靜態(tài)字段

               */

              String getType();

          }

          如上述代碼所示:

          • 1)定義了泛型 <T> ,需要是 Message 的實(shí)現(xiàn)類;
          • 2)定義的兩個(gè)接口方法,自己看下注釋哈。

          下圖,是我們涉及到的 MessageHandler 實(shí)現(xiàn)類。

          如下圖所示: 

          6.3、MessageHandlerContainer

          創(chuàng)建 MessageHandlerContainer 類,作為 MessageHandler 的容器。

          代碼如下: 

           實(shí)現(xiàn) InitializingBean 接口,在 #afterPropertiesSet() 方法中,掃描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。

           在 #getMessageHandler(String type) 方法中,獲得類型對(duì)應(yīng)的 MessageHandler 對(duì)象。稍后,我們會(huì)在 MessageDispatcher 調(diào)用該方法。

           在 #getMessageClass(MessageHandler handler) 方法中,通過(guò) MessageHandler 中,通過(guò)解析其類上的泛型,獲得消息類型對(duì)應(yīng)的 Class 類。這是參考 rocketmq-spring 項(xiàng)目的 DefaultRocketMQListenerContainer#getMessageType() 方法,進(jìn)行略微修改。

          6.4、MessageDispatcher

          創(chuàng)建 MessageDispatcher 類,將 Invocation 分發(fā)到其對(duì)應(yīng)的 MessageHandler 中,進(jìn)行業(yè)務(wù)邏輯的執(zhí)行。

          代碼如下:

          @ChannelHandler.Sharable

          public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {

           

              @Autowired

              private MessageHandlerContainer messageHandlerContainer;

           

              private final ExecutorService executor =  Executors.newFixedThreadPool(200);

           

              @Override

              protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {

                  // <3.1> 獲得 type 對(duì)應(yīng)的 MessageHandler 處理器

                  MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());

                  // 獲得  MessageHandler 處理器的消息類

                  Class<? extendsMessage> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);

                  // <3.2> 解析消息

                  Message message = JSON.parseObject(invocation.getMessage(), messageClass);

                  // <3.3> 執(zhí)行邏輯

                  executor.submit(newRunnable() {

           

                      @Override

                      public void run() {

                          // noinspection unchecked

                          messageHandler.execute(ctx.channel(), message);

                      }

                  });

              }

          }

          ① 在類上添加 @ChannelHandler.Sharable 注解,標(biāo)記這個(gè) ChannelHandler 可以被多個(gè) Channel 使用。

          ② SimpleChannelInboundHandler 是 Netty 定義的消息處理 ChannelHandler 抽象類,處理消息的類型是 <I> 泛型時(shí)。

          ③ #channelRead0(ChannelHandlerContext ctx, Invocation invocation) 方法,處理消息,進(jìn)行分發(fā)。

          <3.1> 處,調(diào)用 MessageHandlerContainer 的 #getMessageHandler(String type) 方法,獲得 Invocation 的 type 對(duì)應(yīng)的 MessageHandler 處理器。

          然后,調(diào)用 MessageHandlerContainer 的 #getMessageClass(messageHandler) 方法,獲得  MessageHandler 處理器的消息類。

          <3.2> 處,調(diào)用 JSON 的 ## parseObject(String text, Class<T> clazz) 方法,將 Invocation 的 message 解析成 MessageHandler 對(duì)應(yīng)的消息對(duì)象。

          <3.3> 處,丟到線程池中,然后調(diào)用 MessageHandler 的 #execute(Channel channel, T message) 方法,執(zhí)行業(yè)務(wù)邏輯。

          注意:為什么要丟到 executor 線程池中呢?我們先來(lái)了解下 EventGroup 的線程模型。

          友情提示:在我們啟動(dòng) Netty 服務(wù)端或者客戶端時(shí),都會(huì)設(shè)置其 EventGroup。

          EventGroup 我們可以先簡(jiǎn)單理解成一個(gè)線程池,并且線程池的大小僅僅是 CPU 數(shù)量 * 2。每個(gè) Channel 僅僅會(huì)被分配到其中的一個(gè)線程上,進(jìn)行數(shù)據(jù)的讀寫(xiě)。并且,多個(gè) Channel 會(huì)共享一個(gè)線程,即使用同一個(gè)線程進(jìn)行數(shù)據(jù)的讀寫(xiě)。

          那么試著思考下,MessageHandler 的具體邏輯視線中,往往會(huì)涉及到 IO 處理,例如說(shuō)進(jìn)行數(shù)據(jù)庫(kù)的讀取。這樣,就會(huì)導(dǎo)致一個(gè) Channel 在執(zhí)行 MessageHandler 的過(guò)程中,阻塞了共享當(dāng)前線程的其它 Channel 的數(shù)據(jù)讀取。

          因此,我們?cè)谶@里創(chuàng)建了 executor 線程池,進(jìn)行 MessageHandler 的邏輯執(zhí)行,避免阻塞 Channel 的數(shù)據(jù)讀取。

          可能會(huì)有人說(shuō),我們是不是能夠把 EventGroup 的線程池設(shè)置大一點(diǎn),例如說(shuō) 200 呢?對(duì)于長(zhǎng)連接的 Netty 服務(wù)端,往往會(huì)有 1000 ~ 100000 的 Netty 客戶端連接上來(lái),這樣無(wú)論設(shè)置多大的線程池,都會(huì)出現(xiàn)阻塞數(shù)據(jù)讀取的情況。

          友情提示:executor 線程池,我們一般稱之為業(yè)務(wù)線程池或者邏輯線程池,顧名思義,就是執(zhí)行業(yè)務(wù)邏輯的。這樣的設(shè)計(jì)方式,目前 Dubbo 等等 RPC 框架,都采用這種方式。后續(xù),可以認(rèn)真閱讀下《【NIO 系列】——之 Reactor 模型》文章,進(jìn)一步理解。

          6.5、NettyServerConfig

          創(chuàng)建 NettyServerConfig 配置類,創(chuàng)建 MessageDispatcher 和 MessageHandlerContainer Bean。

          代碼如下:

          @Configuration

          public class NettyServerConfig {

           

              @Bean

              public MessageDispatcher messageDispatcher() {

                  return new MessageDispatcher();

              }

           

              @Bean

              public MessageHandlerContainer messageHandlerContainer() {

                  return new MessageHandlerContainer();

              }

          }

          6.6、NettyClientConfig

          創(chuàng)建 NettyClientConfig 配置類,創(chuàng)建 MessageDispatcher 和 MessageHandlerContainer Bean。

          代碼如下:

          @Configuration

          public class NettyClientConfig {

              @Bean

              public MessageDispatcher messageDispatcher() {

                  return new MessageDispatcher();

              }

              @Bean

              public MessageHandlerContainer messageHandlerContainer() {

                  return new MessageHandlerContainer();

              }

          }

          6.7、本章小結(jié)

          后續(xù),我們將在如下小節(jié),具體演示消息分發(fā)的使用。

          七、斷開(kāi)重連

          Netty 客戶端需要實(shí)現(xiàn)斷開(kāi)重連機(jī)制,解決各種情況下的斷開(kāi)情況。

          例如說(shuō):

          • 1)Netty 客戶端啟動(dòng)時(shí),Netty 服務(wù)端處于掛掉,導(dǎo)致無(wú)法連接上;
          • 2)在運(yùn)行過(guò)程中,Netty 服務(wù)端掛掉,導(dǎo)致連接被斷開(kāi);
          • 3)任一一端網(wǎng)絡(luò)抖動(dòng),導(dǎo)致連接異常斷開(kāi)。

          具體的代碼實(shí)現(xiàn)比較簡(jiǎn)單,只需要在兩個(gè)地方增加重連機(jī)制:

          • 1)Netty 客戶端啟動(dòng)時(shí),無(wú)法連接 Netty 服務(wù)端時(shí),發(fā)起重連;
          • 2)Netty 客戶端運(yùn)行時(shí),和 Netty 斷開(kāi)連接時(shí),發(fā)起重連。

          考慮到重連會(huì)存在失敗的情況,我們采用定時(shí)重連的方式,避免占用過(guò)多資源。

          7.1、具體代碼

          ① 在 NettyClient 中,提供 #reconnect() 方法,實(shí)現(xiàn)定時(shí)重連的邏輯。

          代碼如下:

          // NettyClient.java

          public void reconnect() {

              eventGroup.schedule(new Runnable() {

                  @Override

                  publicvoidrun() {

                      logger.info("[reconnect][開(kāi)始重連]");

                      try{

                          start();

                      } catch(InterruptedException e) {

                          logger.error("[reconnect][重連失敗]", e);

                      }

                  }

              }, RECONNECT_SECONDS, TimeUnit.SECONDS);

              logger.info("[reconnect][{} 秒后將發(fā)起重連]", RECONNECT_SECONDS);

          }

          通過(guò)調(diào)用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit) 方法,實(shí)現(xiàn)定時(shí)邏輯。而在內(nèi)部的具體邏輯,調(diào)用 NettyClient 的 #start() 方法,發(fā)起連接 Netty 服務(wù)端。

          又因?yàn)?NettyClient 在 #start() 方法在連接 Netty 服務(wù)端失敗時(shí),又會(huì)調(diào)用 #reconnect() 方法,從而再次發(fā)起定時(shí)重連。如此循環(huán)反復(fù),知道 Netty 客戶端連接上 Netty 服務(wù)端。

          如下圖所示: 

          ② 在 NettyClientHandler 中,實(shí)現(xiàn) #channelInactive(ChannelHandlerContext ctx) 方法,在發(fā)現(xiàn)和 Netty 服務(wù)端斷開(kāi)時(shí),調(diào)用 Netty Client 的 #reconnect() 方法,發(fā)起重連。

          代碼如下:

          // NettyClientHandler.java

          @Override

          public void channelInactive(ChannelHandlerContext ctx) throws Exception {

              // 發(fā)起重連

              nettyClient.reconnect();

              // 繼續(xù)觸發(fā)事件

              super.channelInactive(ctx);

          }

          7.2、簡(jiǎn)單測(cè)試

          ① 啟動(dòng) Netty Client,不要啟動(dòng) Netty Server,控制臺(tái)打印日志如下圖: 

          可以看到 Netty Client 在連接失敗時(shí),不斷發(fā)起定時(shí)重連。

          ② 啟動(dòng) Netty Server,控制臺(tái)打印如下圖: 

          可以看到 Netty Client 成功重連上 Netty Server。

          八、心跳機(jī)制與空閑檢測(cè)

          我們可以了解到 TCP 自帶的空閑檢測(cè)機(jī)制,默認(rèn)是 2 小時(shí)。這樣的檢測(cè)機(jī)制,從系統(tǒng)資源層面上來(lái)說(shuō)是可以接受的。

          但是在業(yè)務(wù)層面,如果 2 小時(shí)才發(fā)現(xiàn)客戶端與服務(wù)端的連接實(shí)際已經(jīng)斷開(kāi),會(huì)導(dǎo)致中間非常多的消息丟失,影響客戶的使用體驗(yàn)。

          因此,我們需要在業(yè)務(wù)層面,自己實(shí)現(xiàn)空閑檢測(cè),保證盡快發(fā)現(xiàn)客戶端與服務(wù)端實(shí)際已經(jīng)斷開(kāi)的情況。

          實(shí)現(xiàn)邏輯如下:

          • 1)服務(wù)端發(fā)現(xiàn) 180 秒未從客戶端讀取到消息,主動(dòng)斷開(kāi)連接;
          • 2)客戶端發(fā)現(xiàn) 180 秒未從服務(wù)端讀取到消息,主動(dòng)斷開(kāi)連接。

          考慮到客戶端和服務(wù)端之間并不是一直有消息的交互,所以我們需要增加心跳機(jī)制。

          邏輯如下:

          • 1)客戶端每 60 秒向服務(wù)端發(fā)起一次心跳消息,保證服務(wù)端可以讀取到消息;
          • 2)服務(wù)端在收到心跳消息時(shí),回復(fù)客戶端一條確認(rèn)消息,保證客戶端可以讀取到消息。

          友情提示:

          為什么是 180 秒?可以加大或者減小,看自己希望多快檢測(cè)到連接異常。過(guò)短的時(shí)間,會(huì)導(dǎo)致心跳過(guò)于頻繁,占用過(guò)多資源。

          為什么是 60 秒?三次機(jī)會(huì),確認(rèn)是否心跳超時(shí)。

          雖然聽(tīng)起來(lái)有點(diǎn)復(fù)雜,但是實(shí)現(xiàn)起來(lái)并不復(fù)雜哈。

          8.1、服務(wù)端的空閑檢測(cè)

          在 NettyServerHandlerInitializer 中,我們添加了一個(gè) ReadTimeoutHandler 處理器,它在超過(guò)指定時(shí)間未從對(duì)端讀取到數(shù)據(jù),會(huì)拋出 ReadTimeoutException 異常。

          如下圖所示:

          通過(guò)這樣的方式,實(shí)現(xiàn)服務(wù)端發(fā)現(xiàn) 180 秒未從客戶端讀取到消息,主動(dòng)斷開(kāi)連接。

          8.2、客戶端的空閑檢測(cè)

          在 NettyClientHandlerInitializer 中,我們添加了一個(gè) ReadTimeoutHandler 處理器,它在超過(guò)指定時(shí)間未從對(duì)端讀取到數(shù)據(jù),會(huì)拋出 ReadTimeoutException 異常。

          如下圖所示: 

          通過(guò)這樣的方式,實(shí)現(xiàn)客戶端發(fā)現(xiàn) 180 秒未從服務(wù)端讀取到消息,主動(dòng)斷開(kāi)連接。

          8.3、心跳機(jī)制

          Netty 提供了 IdleStateHandler 處理器,提供空閑檢測(cè)的功能,在 Channel 的讀或者寫(xiě)空閑時(shí)間太長(zhǎng)時(shí),將會(huì)觸發(fā)一個(gè) IdleStateEvent 事件。

          這樣,我們只需要在 NettyClientHandler 處理器中,在接收到 IdleStateEvent 事件時(shí),客戶端向客戶端發(fā)送一次心跳消息。

          如下圖所示:

          其中,HeartbeatRequest 是心跳請(qǐng)求。

          同時(shí),我們?cè)诜?wù)端項(xiàng)目中,創(chuàng)建了一個(gè) HeartbeatRequestHandler 消息處理器,在收到客戶端的心跳請(qǐng)求時(shí),回復(fù)客戶端一條確認(rèn)消息。

          代碼如下:

          @Component

          public class HeartbeatRequestHandler implementsMessageHandler<HeartbeatRequest> {

              private Logger logger = LoggerFactory.getLogger(getClass());

           

              @Override

              public void execute(Channel channel, HeartbeatRequest message) {

                  logger.info("[execute][收到連接({}) 的心跳請(qǐng)求]", channel.id());

                  // 響應(yīng)心跳

                  HeartbeatResponse response = newHeartbeatResponse();

                  channel.writeAndFlush(newInvocation(HeartbeatResponse.TYPE, response));

              }

           

              @Override

              public String getType() {

                  return HeartbeatRequest.TYPE;

              }

          }

          其中,HeartbeatResponse 是心跳確認(rèn)響應(yīng)。

          8.4、簡(jiǎn)單測(cè)試

          啟動(dòng) Netty Server 服務(wù)端,再啟動(dòng) Netty Client 客戶端,耐心等待 60 秒后,可以看到心跳日志如下: 

          九、認(rèn)證邏輯

          從本小節(jié)開(kāi)始,我們就具體看看業(yè)務(wù)邏輯的處理示例。

          認(rèn)證的過(guò)程,如下圖所示:

          9.1、AuthRequest

          創(chuàng)建 AuthRequest 類,定義用戶認(rèn)證請(qǐng)求。

          代碼如下:

          public class AuthRequest implements Message {

              public static final String TYPE = "AUTH_REQUEST";

             /**

               * 認(rèn)證 Token

               */

              private String accessToken;

              // ... 省略 setter、getter、toString 方法

          }

          這里我們使用 accessToken 認(rèn)證令牌進(jìn)行認(rèn)證。

          因?yàn)橐话闱闆r下,我們使用 HTTP 進(jìn)行登錄系統(tǒng),然后使用登錄后的身份標(biāo)識(shí)(例如說(shuō) accessToken 認(rèn)證令牌),將客戶端和當(dāng)前用戶進(jìn)行認(rèn)證綁定。

          9.2、AuthResponse

          創(chuàng)建 AuthResponse 類,定義用戶認(rèn)證響應(yīng)。

          代碼如下:

          public class AuthResponse implements Message {

              public static final String TYPE = "AUTH_RESPONSE";

           

              /**

               * 響應(yīng)狀態(tài)碼

               */

              private Integer code;

              /**

               * 響應(yīng)提示

               */

              private String message;

           

              // ... 省略 setter、getter、toString 方法

          }

          9.3、AuthRequestHandler

          服務(wù)端...

          創(chuàng)建 AuthRequestHandler 類,為服務(wù)端處理客戶端的認(rèn)證請(qǐng)求。

          代碼如下: 

          代碼比較簡(jiǎn)單,看看 <1>、<2>、<3>、<4> 上的注釋。

          9.4、AuthResponseHandler

          客戶端...

          創(chuàng)建 AuthResponseHandler 類,為客戶端處理服務(wù)端的認(rèn)證響應(yīng)。

          代碼如下:

          @Component

          public class AuthResponseHandler implements MessageHandler<AuthResponse> {

           

              private Logger logger = LoggerFactory.getLogger(getClass());

           

              @Override

              public void execute(Channel channel, AuthResponse message) {

                  logger.info("[execute][認(rèn)證結(jié)果:{}]", message);

              }

           

              @Override

              public String getType() {

                  return AuthResponse.TYPE;

              }

          }

          打印個(gè)認(rèn)證結(jié)果,方便調(diào)試。

          9.5、TestController

          客戶端...

          創(chuàng)建 TestController 類,提供 /test/mock 接口,模擬客戶端向服務(wù)端發(fā)送請(qǐng)求。

          代碼如下:

          @RestController

          @RequestMapping("/test")

          public class TestController {

           

              @Autowired

              private NettyClient nettyClient;

           

              @PostMapping("/mock")

              public String mock(String type, String message) {

                  // 創(chuàng)建 Invocation 對(duì)象

                  Invocation invocation = new Invocation(type, message);

                  // 發(fā)送消息

                  nettyClient.send(invocation);

                  return "success";

              }

          }

          9.6、簡(jiǎn)單測(cè)試

          啟動(dòng) Netty Server 服務(wù)端,再啟動(dòng) Netty Client 客戶端,然后使用 Postman 模擬一次認(rèn)證請(qǐng)求。

          如下圖所示: 

          同時(shí),可以看到認(rèn)證成功的日志如下:

          十一、群聊邏輯

          群聊的過(guò)程,如下圖所示: 

          服務(wù)端負(fù)責(zé)將客戶端 A 發(fā)送的群聊消息,轉(zhuǎn)發(fā)給客戶端 A、B、C。

          友情提示:考慮到邏輯簡(jiǎn)潔,提供的本小節(jié)的示例并不是一個(gè)一個(gè)群,而是所有人在一個(gè)大的群聊中哈~

          11.1、ChatSendToAllRequest

          創(chuàng)建 ChatSendToOneRequest 類,發(fā)送給所有人的群聊消息的請(qǐng)求。

          代碼如下:

          public class ChatSendToAllRequest implements Message {

              public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";

              /**

               * 消息編號(hào)

               */

              private String msgId;

              /**

               * 內(nèi)容

               */

              private String content;

           

              // ... 省略 setter、getter、toString 方法

          }

          PS:如果是正經(jīng)的群聊,會(huì)有一個(gè) groupId 字段,表示群編號(hào)。

          11.2、ChatSendToAllHandler

          服務(wù)端...

          創(chuàng)建 ChatSendToAllHandler 類,為服務(wù)端處理客戶端的群聊請(qǐng)求。

          代碼如下: 

          代碼比較簡(jiǎn)單,看看 <1>、<2> 上的注釋。

          11.3、簡(jiǎn)單測(cè)試

           啟動(dòng) Netty Server 服務(wù)端。

           啟動(dòng) Netty Client 客戶端 A。然后使用 Postman 模擬一次認(rèn)證請(qǐng)求(用戶為 yunai)。

          如下圖所示:

           啟動(dòng) Netty Client 客戶端 B。注意,需要設(shè)置 --server.port 端口為 8081,避免沖突。

           啟動(dòng) Netty Client 客戶端 C。注意,需要設(shè)置 --server.port 端口為 8082,避免沖突。 

           最后使用 Postman 模擬一次發(fā)送群聊消息。

          如下圖所示: 

          同時(shí),可以看到客戶端 A 群發(fā)給所有客戶端的日志如下:

          最后,要想系統(tǒng)地學(xué)習(xí)IM開(kāi)發(fā)的方方面面,請(qǐng)繼續(xù)閱讀:《新手入門一篇就夠:從零開(kāi)發(fā)移動(dòng)端IM

          附錄、系列文章

          跟著源碼學(xué)IM(一):手把手教你用Netty實(shí)現(xiàn)心跳機(jī)制、斷線重連機(jī)制

          跟著源碼學(xué)IM(二):自已開(kāi)發(fā)IM很難?手把手教你擼一個(gè)Andriod版IM

          跟著源碼學(xué)IM(三):基于Netty,從零開(kāi)發(fā)一個(gè)IM服務(wù)端

          跟著源碼學(xué)IM(四):拿起鍵盤就是干,教你徒手開(kāi)發(fā)一套分布式IM系統(tǒng)

          跟著源碼學(xué)IM(五):正確理解IM長(zhǎng)連接、心跳及重連機(jī)制,并動(dòng)手實(shí)現(xiàn)

          跟著源碼學(xué)IM(六):手把手教你用Go快速搭建高性能、可擴(kuò)展的IM系統(tǒng)

          跟著源碼學(xué)IM(七):手把手教你用WebSocket打造Web端IM聊天

          跟著源碼學(xué)IM(八):萬(wàn)字長(zhǎng)文,手把手教你用Netty打造IM聊天》(* 本文)

          本文已同步發(fā)布于“即時(shí)通訊技術(shù)圈”公眾號(hào)。

          ▲ 本文在公眾號(hào)上的鏈接是:點(diǎn)此進(jìn)入。同步發(fā)布鏈接是:http://www.52im.net/thread-3489-1-1.html



          作者:Jack Jiang (點(diǎn)擊作者姓名進(jìn)入Github)
          出處:http://www.52im.net/space-uid-1.html
          交流:歡迎加入即時(shí)通訊開(kāi)發(fā)交流群 215891622
          討論:http://www.52im.net/
          Jack Jiang同時(shí)是【原創(chuàng)Java Swing外觀工程BeautyEye】【輕量級(jí)移動(dòng)端即時(shí)通訊框架MobileIMSDK】的作者,可前往下載交流。
          本博文 歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明出處(也可前往 我的52im.net 找到我)。


          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          Jack Jiang的 Mail: jb2011@163.com, 聯(lián)系QQ: 413980957, 微信: hellojackjiang
          主站蜘蛛池模板: 绥德县| 许昌市| 屯留县| 宝丰县| 甘孜县| 黔西| 南京市| 剑河县| 曲松县| 肇东市| 万年县| 华容县| 枝江市| 十堰市| 寻乌县| 仙桃市| 祁阳县| 霍邱县| 明溪县| 大港区| 铁力市| 报价| 祁阳县| 胶南市| 香河县| 吉木萨尔县| 翁牛特旗| 美姑县| 上饶市| 当雄县| 启东市| 鄂州市| 凤冈县| 玛纳斯县| 周宁县| 田东县| 都江堰市| 河间市| 章丘市| 安乡县| 淮阳县|