Jack Jiang

          我的最新工程MobileIMSDK:http://git.oschina.net/jackjiang/MobileIMSDK
          posts - 499, comments - 13, trackbacks - 0, articles - 1

          本文由竹子愛熊貓分享,原題“(十一)Netty實(shí)戰(zhàn)篇:基于Netty框架打造一款高性能的IM即時(shí)通訊程序”,本文有修訂和改動(dòng)。

          1、引言

          關(guān)于Netty網(wǎng)絡(luò)框架的內(nèi)容,前面已經(jīng)講了兩個(gè)章節(jié),但總歸來說難以真正掌握,畢竟只是對其中一個(gè)個(gè)組件進(jìn)行講解,很難讓諸位將其串起來形成一條線,所以本章中則會結(jié)合實(shí)戰(zhàn)案例,對Netty進(jìn)行更深層次的學(xué)習(xí)與掌握,實(shí)戰(zhàn)案例也并不難,一個(gè)非常樸素的IM聊天程序。

          原本打算做個(gè)多人斗地主練習(xí)程序,但那需要織入過多的業(yè)務(wù)邏輯,因此一方面會帶來不必要的理解難度,讓案例更為復(fù)雜化,另一方面代碼量也會偏多,所以最終依舊選擇實(shí)現(xiàn)基本的IM聊天程序,既簡單,又能加深對Netty的理解。

          技術(shù)交流:

          (本文已同步發(fā)布于:http://www.52im.net/thread-4530-1-1.html

          2、配套源碼

          本文配套源碼的開源托管地址是:

          3、知識準(zhǔn)備

          關(guān)于 Netty 是什么,這里簡單介紹下:

          Netty 是一個(gè) Java 開源框架。Netty 提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。

          也就是說,Netty 是一個(gè)基于 NIO 的客戶、服務(wù)器端編程框架,使用Netty 可以確保你快速和簡單的開發(fā)出一個(gè)網(wǎng)絡(luò)應(yīng)用,例如實(shí)現(xiàn)了某種協(xié)議的客戶,服務(wù)端應(yīng)用。

          Netty 相當(dāng)簡化和流線化了網(wǎng)絡(luò)應(yīng)用的編程開發(fā)過程,例如,TCP 和 UDP 的 Socket 服務(wù)開發(fā)。

          有關(guān)Netty的入門文章:

          如果你連Java NIO都不知道,下面的文章建議優(yōu)先讀:

          Netty源碼和API 在線查閱地址:

          4、基于Netty設(shè)計(jì)通信協(xié)議

          協(xié)議,這玩意兒相信大家肯定不陌生了,簡單回顧一下協(xié)議的概念:網(wǎng)絡(luò)協(xié)議是指一種通信雙方都必須遵守的約定,兩個(gè)不同的端,按照一定的格式對數(shù)據(jù)進(jìn)行“編碼”,同時(shí)按照相同的規(guī)則進(jìn)行“解碼”,從而實(shí)現(xiàn)兩者之間的數(shù)據(jù)傳輸與通信。

          當(dāng)自己想要打造一款I(lǐng)M通信程序時(shí),對于消息的封裝、拆分也同樣需要設(shè)計(jì)一個(gè)協(xié)議,通信的兩端都必須遵守該協(xié)議工作,這也是實(shí)現(xiàn)通信程序的前提。

          但為什么需要通信協(xié)議呢?

          因?yàn)門CP/IP中是基于流的方式傳輸消息,消息與消息之間沒有邊界,而協(xié)議的目的則在于約定消息的樣式、邊界等。

          5、Redis通信的RESP協(xié)議參考學(xué)習(xí)

          不知大家是否還記得之前我聊到的RESP客戶端協(xié)議,這是Redis提供的一種客戶端通信協(xié)議。如果想要操作Redis,就必須遵守該協(xié)議的格式發(fā)送數(shù)據(jù)。

          這個(gè)協(xié)議特別簡單,如下:

          • 1)首先要求所有命令,都以*開頭,后面跟著具體的子命令數(shù)量,接著用換行符分割;
          • 2)接著需要先用$符號聲明每個(gè)子命令的長度,然后再用換行符分割;
          • 3)最后再拼接上具體的子命令,同樣用換行符分割。

          這樣描述有些令人難懂,那就直接看個(gè)案例,例如一條簡單set命令。

          如下:

          客戶端命令:

              setname ZhuZi

          轉(zhuǎn)變?yōu)镽ESP指令:

              *3

              $3

              set

              $4

              name

              $5

              ZhuZi

          按照Redis的規(guī)定,但凡滿足RESP協(xié)議的客戶端,都可以直接連接并操作Redis服務(wù)端,這也就意味著咱們可以直接通過Netty來手寫一個(gè)Redis客戶端。

          代碼如下:

          // 基于Netty、RESP協(xié)議實(shí)現(xiàn)的Redis客戶端

          publicclassRedisClient {

              // 換行符的ASCII碼

              staticfinalbyte[] LINE = {13, 10};

           

              publicstaticvoidmain(String[] args) {

                  EventLoopGroup worker = newNioEventLoopGroup();

                  Bootstrap client = newBootstrap();

           

                  try{

                      client.group(worker);

                      client.channel(NioSocketChannel.class);

                      client.handler(newChannelInitializer<SocketChannel>() {

                          @Override

                          protectedvoidinitChannel(SocketChannel socketChannel)

                                                                  throwsException {

                              ChannelPipeline pipeline = socketChannel.pipeline();

           

                              pipeline.addLast(newChannelInboundHandlerAdapter(){

           

                                  // 通道建立成功后調(diào)用:向Redis發(fā)送一條set命令

                                  @Override

                                  publicvoidchannelActive(ChannelHandlerContext ctx)

                                                                      throwsException {

                                      String command = "set name ZhuZi";

                                      ByteBuf buffer = respCommand(command);

                                      ctx.channel().writeAndFlush(buffer);

                                  }

           

                                  // Redis響應(yīng)數(shù)據(jù)時(shí)觸發(fā):打印Redis的響應(yīng)結(jié)果

                                  @Override

                                  publicvoidchannelRead(ChannelHandlerContext ctx,

                                                          Object msg) throwsException {

                                      // 接受Redis服務(wù)端執(zhí)行指令后的結(jié)果

                                      ByteBuf buffer = (ByteBuf) msg;

                                      System.out.println(buffer.toString(CharsetUtil.UTF_8));

                                  }

                              });

                          }

                      });

           

                      // 根據(jù)IP、端口連接Redis服務(wù)端

                      client.connect("192.168.12.129", 6379).sync();

                  } catch(Exception e){

                      e.printStackTrace();

                  }

              }

           

              privatestaticByteBuf respCommand(String command){

                  // 先對傳入的命令以空格進(jìn)行分割

                  String[] commands = command.split(" ");

                  ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();

           

                  // 遵循RESP協(xié)議:先寫入指令的個(gè)數(shù)

                  buffer.writeBytes(("*"+ commands.length).getBytes());

                  buffer.writeBytes(LINE);

           

                  // 接著分別寫入每個(gè)指令的長度以及具體值

                  for(String s : commands) {

                      buffer.writeBytes(("$"+ s.length()).getBytes());

                      buffer.writeBytes(LINE);

                      buffer.writeBytes(s.getBytes());

                      buffer.writeBytes(LINE);

                  }

                  // 把轉(zhuǎn)換成RESP格式的命令返回

                  returnbuffer;

              }

          }

          在上述這個(gè)案例中,也僅僅只是通過respCommand()這個(gè)方法,對用戶輸入的指令進(jìn)行了轉(zhuǎn)換。同時(shí)在上面通過Netty,與Redis的地址、端口建立了連接。在連接建立成功后,就會向Redis發(fā)送一條轉(zhuǎn)換成RESP指令的set命令。接著等待Redis的響應(yīng)結(jié)果并輸出,如下:

          +OK

          因?yàn)檫@是一條寫指令,所以當(dāng)Redis收到執(zhí)行完成后,最終就會返回一個(gè)OK,大家也可直接去Redis中查詢,也依舊能夠查詢到剛剛寫入的name這個(gè)鍵值。

          6、HTTP超文本傳輸協(xié)議參考學(xué)習(xí)

          前面咱們自己針對于Redis的RESP協(xié)議,對用戶指令進(jìn)行了封裝,然后發(fā)往Redis執(zhí)行。

          但對于這些常用的協(xié)議,Netty早已提供好了現(xiàn)成的處理器,想要使用時(shí)無需從頭開發(fā),可以直接使用現(xiàn)成的處理器來實(shí)現(xiàn)。

          比如現(xiàn)在咱們可以基于Netty提供的處理器,實(shí)現(xiàn)一個(gè)簡單的HTTP服務(wù)器。

          代碼如下:

          // 基于Netty提供的處理器實(shí)現(xiàn)HTTP服務(wù)器

          publicclassHttpServer {

              publicstaticvoidmain(String[] args) throwsInterruptedException {

                  EventLoopGroup boss = newNioEventLoopGroup();

                  EventLoopGroup worker = newNioEventLoopGroup();

                  ServerBootstrap server = newServerBootstrap();

                  server

                      .group(boss,worker)

                      .channel(NioServerSocketChannel.class)

                      .childHandler(newChannelInitializer<NioSocketChannel>() {

                          @Override

                          protectedvoidinitChannel(NioSocketChannel ch) {

                              ChannelPipeline pipeline = ch.pipeline();

           

                              // 添加一個(gè)Netty提供的HTTP處理器

                              pipeline.addLast(newHttpServerCodec());

                              pipeline.addLast(newChannelInboundHandlerAdapter() {

                                  @Override

                                  publicvoidchannelRead(ChannelHandlerContext ctx,

                                                          Object msg) throwsException {

                                      // 在這里輸出一下消息的類型

                                      System.out.println("消息類型:"+ msg.getClass());

                                      super.channelRead(ctx, msg);

                                  }

                              });

                              pipeline.addLast(newSimpleChannelInboundHandler<HttpRequest>() {

                                  @Override

                                  protectedvoidchannelRead0(ChannelHandlerContext ctx,

                                                              HttpRequest msg) throwsException {

                                      System.out.println("客戶端的請求路徑:"+ msg.uri());

           

                                      // 創(chuàng)建一個(gè)響應(yīng)對象,版本號與客戶端保持一致,狀態(tài)碼為OK/200

                                      DefaultFullHttpResponse response =

                                              newDefaultFullHttpResponse(

                                                      msg.protocolVersion(),

                                                      HttpResponseStatus.OK);

           

                                      // 構(gòu)造響應(yīng)內(nèi)容

                                      byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();

           

                                      // 設(shè)置響應(yīng)頭:告訴客戶端本次響應(yīng)的數(shù)據(jù)長度

                                      response.headers().setInt(

                                          HttpHeaderNames.CONTENT_LENGTH,content.length);

                                      // 設(shè)置響應(yīng)主體

                                      response.content().writeBytes(content);

           

                                      // 向客戶端寫入響應(yīng)數(shù)據(jù)

                                      ctx.writeAndFlush(response);

                                  }

                              });

                          }

                      })

                      .bind("127.0.0.1",8888)

                      .sync();

              }

          }

          在該案例中,咱們就未曾手動(dòng)對HTTP的數(shù)據(jù)包進(jìn)行拆包處理了,而是在服務(wù)端的pipeline上添加了一個(gè)HttpServerCodec處理器,這個(gè)處理器是Netty官方提供的。

          其類繼承關(guān)系如下:

          publicfinalclassHttpServerCodec

              extendsCombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>

              implementsSourceCodec {

              // ......

          }

          觀察會發(fā)現(xiàn),該類繼承自CombinedChannelDuplexHandler這個(gè)組合類,它組合了編碼器、解碼器。

          這也就意味著HttpServerCodec即可以對客戶端的數(shù)據(jù)做解碼,也可以對服務(wù)端響應(yīng)的數(shù)據(jù)做編碼。

          同時(shí)除開添加了這個(gè)處理器外,在第二個(gè)處理器中打印了一下客戶端的消息類型,最后一個(gè)處理器中,對客戶端的請求做出了響應(yīng),其實(shí)也就是返回了一句話而已。

          此時(shí)在瀏覽器輸入http://127.0.0.1:8888/index.html,結(jié)果如下:

          消息類型:classio.netty.handler.codec.http.DefaultHttpRequest

          消息類型:classio.netty.handler.codec.http.LastHttpContent$1

          客戶端的請求路徑:/index.html

          此時(shí)來看結(jié)果,客戶端的請求會被解析成兩個(gè)部分:

          • 1)第一個(gè)是請求信息;
          • 2)第二個(gè)是主體信息。

          但按理來說瀏覽器發(fā)出的請求,屬于GET類型的請求,GET請求是沒有請求體信息的,但Netty依舊會解析成兩部分~,只不過GET請求的第二部分是空的。

          在第三個(gè)處理器中,咱們直接向客戶端返回了一個(gè)h1標(biāo)簽,同時(shí)也要記得在響應(yīng)頭里面,加上響應(yīng)內(nèi)容的長度信息,否則瀏覽器的加載圈,會一直不同的轉(zhuǎn)動(dòng),畢竟瀏覽器也不知道內(nèi)容有多長,就會一直反復(fù)加載,嘗試等待更多的數(shù)據(jù)。

          7、自定義消息傳輸協(xié)議

          7.1概述

          Netty除開提供了HTTP協(xié)議的處理器外,還提供了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....一系列協(xié)議的實(shí)現(xiàn),具體定義位于io.netty.handler.codec這個(gè)包下,當(dāng)然,咱們也可以自己實(shí)現(xiàn)自定義協(xié)議,按照自己的邏輯對數(shù)據(jù)進(jìn)行編解碼處理。

          很多基于Netty開發(fā)的中間件/組件,其內(nèi)部基本上都開發(fā)了專屬的通信協(xié)議,以此來作為不同節(jié)點(diǎn)間通信的基礎(chǔ),所以解下來咱們基于Netty也來自己設(shè)計(jì)一款通信協(xié)議,這也會作為后續(xù)實(shí)現(xiàn)聊天程序時(shí)的基礎(chǔ)。

          所謂的協(xié)議設(shè)計(jì),其實(shí)僅僅只需要按照一定約束,實(shí)現(xiàn)編碼器與解碼器即可,發(fā)送方在發(fā)出數(shù)據(jù)之前,會經(jīng)過編碼器對數(shù)據(jù)進(jìn)行處理,而接收方在收到數(shù)據(jù)之前,則會由解碼器對數(shù)據(jù)進(jìn)行處理。

          7.2自定義協(xié)議的要素

          在自定義傳輸協(xié)議時(shí),咱們必然需要考慮幾個(gè)因素,如下:

          • 1)魔數(shù):用來第一時(shí)間判斷是否為自己需要的數(shù)據(jù)包;
          • 2)版本號:提高協(xié)議的拓展性,方便后續(xù)對協(xié)議進(jìn)行升級;
          • 3)序列化算法:消息正文具體該使用哪種方式進(jìn)行序列化傳輸,例如Json、ProtoBuf、JDK...;
          • 4)消息類型:第一時(shí)間判斷出當(dāng)前消息的類型;
          • 5)消息序號:為了實(shí)現(xiàn)雙工通信,客戶端和服務(wù)端之間收/發(fā)消息不會相互阻塞;
          • 6)正文長度:提供給LTC解碼器使用,防止解碼時(shí)出現(xiàn)粘包、半包的現(xiàn)象;
          • 7)消息正文:本次消息要傳輸?shù)木唧w數(shù)據(jù)。

          在設(shè)計(jì)協(xié)議時(shí),一個(gè)完整的協(xié)議應(yīng)該涵蓋上述所說的幾方面,這樣才能提供雙方通信時(shí)的基礎(chǔ)。

          基于上述幾個(gè)字段,能夠在第一時(shí)間內(nèi)判斷出:

          • 1)消息是否可用;
          • 2)當(dāng)前協(xié)議版本;
          • 3)消息的具體類型;
          • 4)消息的長度等各類信息。

          從而給后續(xù)處理器使用(自定義的協(xié)議規(guī)則本身就是一個(gè)編解碼處理器而已)。

          7.3自定義協(xié)議實(shí)戰(zhàn)

          前面簡單聊到過,所謂的自定義協(xié)議就是自己規(guī)定消息格式,以及自己實(shí)現(xiàn)編/解碼器對消息實(shí)現(xiàn)封裝/拆解,所以這里想要自定義一個(gè)消息協(xié)議,就只需要滿足前面兩個(gè)條件即可。

          因此實(shí)現(xiàn)如下:

          @ChannelHandler.Sharable

          publicclassChatMessageCodec extendsMessageToMessageCodec<ByteBuf, Message> {

           

              // 消息出站時(shí)會經(jīng)過的編碼方法(將原生消息對象封裝成自定義協(xié)議的消息格式)

              @Override

              protectedvoidencode(ChannelHandlerContext ctx, Message msg,

                                    List<Object> list) throwsException {

                  ByteBuf outMsg = ctx.alloc().buffer();

                  // 前五個(gè)字節(jié)作為魔數(shù)

                  byte[] magicNumber = newbyte[]{'Z','h','u','Z','i'};

                  outMsg.writeBytes(magicNumber);

                  // 一個(gè)字節(jié)作為版本號

                  outMsg.writeByte(1);

                  // 一個(gè)字節(jié)表示序列化方式  0:JDK、1:Json、2:ProtoBuf.....

                  outMsg.writeByte(0);

                  // 一個(gè)字節(jié)用于表示消息類型

                  outMsg.writeByte(msg.getMessageType());

                  // 四個(gè)字節(jié)表示消息序號

                  outMsg.writeInt(msg.getSequenceId());

           

                  // 使用Java-Serializable的方式對消息對象進(jìn)行序列化

                  ByteArrayOutputStream bos = newByteArrayOutputStream();

                  ObjectOutputStream oos = newObjectOutputStream(bos);

                  oos.writeObject(msg);

                  byte[] msgBytes = bos.toByteArray();

           

                  // 使用四個(gè)字節(jié)描述消息正文的長度

                  outMsg.writeInt(msgBytes.length);

                  // 將序列化后的消息對象作為消息正文

                  outMsg.writeBytes(msgBytes);

           

                  // 將封裝好的數(shù)據(jù)傳遞給下一個(gè)處理器

                  list.add(outMsg);

              }

           

              // 消息入站時(shí)會經(jīng)過的解碼方法(將自定義格式的消息轉(zhuǎn)變?yōu)榫唧w的消息對象)

              @Override

              protectedvoiddecode(ChannelHandlerContext ctx,

                                    ByteBuf inMsg, List<Object> list) throwsException {

                  // 讀取前五個(gè)字節(jié)得到魔數(shù)

                  byte[] magicNumber = newbyte[5];

                  inMsg.readBytes(magicNumber,0,5);

                  // 再讀取一個(gè)字節(jié)得到版本號

                  byteversion = inMsg.readByte();

                  // 再讀取一個(gè)字節(jié)得到序列化方式

                  byteserializableType = inMsg.readByte();

                  // 再讀取一個(gè)字節(jié)得到消息類型

                  bytemessageType = inMsg.readByte();

                  // 再讀取四個(gè)字節(jié)得到消息序號

                  intsequenceId = inMsg.readInt();

                  // 再讀取四個(gè)字節(jié)得到消息正文長度

                  intmessageLength = inMsg.readInt();

           

                  // 再根據(jù)正文長度讀取序列化后的字節(jié)正文數(shù)據(jù)

                  byte[] msgBytes = newbyte[messageLength];

                  inMsg.readBytes(msgBytes,0,messageLength);

           

                  // 對于讀取到的消息正文進(jìn)行反序列化,最終得到具體的消息對象

                  ByteArrayInputStream bis = newByteArrayInputStream(msgBytes);

                  ObjectInputStream ois = newObjectInputStream(bis);

                  Message message = (Message) ois.readObject();

           

                  // 最終把反序列化得到的消息對象傳遞給后續(xù)的處理器

                  list.add(message);

              }

          }

          上面自定義的處理器中,繼承了MessageToMessageCodec類,主要負(fù)責(zé)將數(shù)據(jù)在原生ByteBuf與Message之間進(jìn)行相互轉(zhuǎn)換,而Message對象是自定義的消息對象,這里暫且無需過多關(guān)心。

          其中主要實(shí)現(xiàn)了兩個(gè)方法:

          • 1)encode():出站時(shí)會經(jīng)過的編碼方法,會將原生消息對象按自定義的協(xié)議封裝成對應(yīng)的字節(jié)數(shù)據(jù);
          • 2)decode():入站時(shí)會經(jīng)過的解碼方法,會將協(xié)議格式的字節(jié)數(shù)據(jù),轉(zhuǎn)變?yōu)榫唧w的消息對象。

          上述自定義的協(xié)議,也就是一定規(guī)則的字節(jié)數(shù)據(jù),每條消息數(shù)據(jù)的組成如下:

          • 1)魔數(shù):使用第1~5個(gè)字節(jié)來描述,這個(gè)魔數(shù)值可以按自己的想法自定義;
          • 2)版本號:使用第6個(gè)字節(jié)來描述,不同數(shù)字表示不同版本;
          • 3)序列化算法:使用第7個(gè)字節(jié)來描述,不同數(shù)字表示不同序列化方式;
          • 4)消息類型:使用第8個(gè)字節(jié)來描述,不同的消息類型使用不同數(shù)字表示;
          • 5)消息序號:使用第9~12個(gè)字節(jié)來描述,其實(shí)就是一個(gè)四字節(jié)的整數(shù);
          • 6)正文長度:使用第13~16個(gè)字節(jié)來描述,也是一個(gè)四字節(jié)的整數(shù);
          • 7)消息正文:長度不固定,根據(jù)每次具體發(fā)送的數(shù)據(jù)來決定。

          在其中,為了實(shí)現(xiàn)簡單,這里的序列化方式,則采用的是JDK默認(rèn)的Serializable接口方式,但這種方式生成的對象字節(jié)較大,實(shí)際情況中最好還是選擇谷歌的ProtoBuf方式,這種算法屬于序列化算法中,性能最佳的一種落地實(shí)現(xiàn)。

          當(dāng)然,這個(gè)自定義的協(xié)議是提供給后續(xù)的聊天業(yè)務(wù)使用的,但這種實(shí)戰(zhàn)型的內(nèi)容分享,基本上代碼量較高,所以大家看起來會有些枯燥,而本文所使用的聊天室案例,是基于《B站-黑馬Netty視頻教程》二次改良的,因此如若感覺文字描述較為枯燥,可直接點(diǎn)擊前面給出的鏈接,觀看P101~P121視頻進(jìn)行學(xué)習(xí)。

          最后來觀察一下,大家會發(fā)現(xiàn),在咱們定義的這個(gè)協(xié)議編解碼處理器上,存在著一個(gè)@ChannelHandler.Sharable注解,這個(gè)注解的作用是干嗎的呢?其實(shí)很簡單,用來標(biāo)識當(dāng)前處理器是否可在多線程環(huán)境下使用,如果帶有該注解的處理器,則表示可以在多個(gè)通道間共用,因此只需要?jiǎng)?chuàng)建一個(gè)即可,反之同理,如果不帶有該注解的處理器,則每個(gè)通道需要單獨(dú)創(chuàng)建使用。

          PS:如果你想系統(tǒng)學(xué)習(xí)Protobuf,可以從以下文章入手:

          如何選擇即時(shí)通訊應(yīng)用的數(shù)據(jù)傳輸格式

          強(qiáng)列建議將Protobuf作為你的即時(shí)通訊應(yīng)用數(shù)據(jù)傳輸格式

          IM通訊協(xié)議專題學(xué)習(xí)(一):Protobuf從入門到精通,一篇就夠!

          IM通訊協(xié)議專題學(xué)習(xí)(二):快速理解Protobuf的背景、原理、使用、優(yōu)缺點(diǎn)

          IM通訊協(xié)議專題學(xué)習(xí)(三):由淺入深,從根上理解Protobuf的編解碼原理

          IM通訊協(xié)議專題學(xué)習(xí)(四):從Base64到Protobuf,詳解Protobuf的數(shù)據(jù)編碼原理

          IM通訊協(xié)議專題學(xué)習(xí)(八):金蝶隨手記團(tuán)隊(duì)的Protobuf應(yīng)用實(shí)踐(原理篇)

          8、實(shí)戰(zhàn)要點(diǎn)1:IM程序的用戶模塊

          8.1概述

          聊天、聊天,自然是得先有人,然后才能進(jìn)行聊天溝通。與QQ、微信類似,如果你想要使用某款聊天程序時(shí),前提都得是先具備一個(gè)對應(yīng)的賬戶才行。

          因此在咱們設(shè)計(jì)IM系統(tǒng)之處,那也需要對應(yīng)的用戶功能實(shí)現(xiàn)。但這里為了簡單,同樣不再結(jié)合數(shù)據(jù)庫實(shí)現(xiàn)完整的用戶模塊了,而是基于內(nèi)存實(shí)現(xiàn)用戶的管理。

          如下:

          publicinterfaceUserService {

              booleanlogin(String username, String password);

          }

          這是用戶模塊的頂層接口,僅僅只提供了一個(gè)登錄接口,關(guān)于注冊、鑒權(quán)、等級.....等一系列功能,大家感興趣的可在后續(xù)進(jìn)行拓展實(shí)現(xiàn),接著來看看該接口的實(shí)現(xiàn)類。

          如下:

          publicclassUserServiceMemoryImpl implementsUserService {

              privateMap<String, String> allUserMap = newConcurrentHashMap<>();

           

              {

                  // 在代碼塊中對用戶列表進(jìn)行初始化,向其中添加了兩個(gè)用戶信息

                  allUserMap.put("ZhuZi", "123");

                  allUserMap.put("XiongMao", "123");

              }

           

              @Override

              publicbooleanlogin(String username, String password) {

                  String pass = allUserMap.get(username);

                  if(pass == null) {

                      returnfalse;

                  }

                  returnpass.equals(password);

              }

          }

          這個(gè)實(shí)現(xiàn)類并未結(jié)合數(shù)據(jù)庫來實(shí)現(xiàn),而是僅僅在程序啟動(dòng)時(shí),通過代碼塊的方式,加載了ZhuZi、XiongMao兩個(gè)用戶信息并放入內(nèi)存的Map容器中,這里有興趣的小伙伴,可自行將Map容器換成數(shù)據(jù)庫的表即可。

          其中實(shí)現(xiàn)的login()登錄接口尤為簡單,僅僅只是判斷了一下有沒有對應(yīng)用戶,如果有的話則看看密碼是否正確,正確返回true,密碼錯(cuò)誤則返回false。是的,我所寫的登錄功能就是這么簡單,走個(gè)簡單的過場,哈哈哈~

          8.2服務(wù)端、客戶端的基礎(chǔ)架構(gòu)

          基本的用戶模塊有了,但這里還未曾套入具體實(shí)現(xiàn),因此先簡單的搭建出服務(wù)端、客戶端的架構(gòu),然后再基于構(gòu)建好的架構(gòu)實(shí)現(xiàn)基礎(chǔ)的用戶登錄功能。

          服務(wù)端的基礎(chǔ)搭建如下:

          publicclassChatServer {

              publicstaticvoidmain(String[] args) {

                  NioEventLoopGroup boss = newNioEventLoopGroup();

                  NioEventLoopGroup worker = newNioEventLoopGroup();

           

                  ChatMessageCodec MESSAGE_CODEC = newChatMessageCodec();

           

                  try{

                      ServerBootstrap serverBootstrap = newServerBootstrap();

                      serverBootstrap.channel(NioServerSocketChannel.class);

                      serverBootstrap.group(boss, worker);

                      serverBootstrap.childHandler(newChannelInitializer<SocketChannel>() {

                          @Override

                          protectedvoidinitChannel(SocketChannel ch) throwsException {

                              ch.pipeline().addLast(MESSAGE_CODEC);

                          }

                      });

           

                      Channel channel = serverBootstrap.bind(8888).sync().channel();

                      channel.closeFuture().sync();

                  } catch(InterruptedException e) {

                      System.out.println("服務(wù)端出現(xiàn)錯(cuò)誤:"+ e);

                  } finally{

                      boss.shutdownGracefully();

                      worker.shutdownGracefully();

                  }

              }

          }

          服務(wù)端的代碼目前很簡單,僅僅只是裝載了一個(gè)自己的協(xié)議編/解碼處理器,然后就是一些老步驟,不再過多的重復(fù)贅述,接著再來搭建一個(gè)簡單的客戶端。

          代碼實(shí)現(xiàn)如下:

          publicclassChatClient {

              publicstaticvoidmain(String[] args) {

                  NioEventLoopGroup group = newNioEventLoopGroup();

           

                  ChatMessageCodec MESSAGE_CODEC = newChatMessageCodec();

           

                  try{

                      Bootstrap bootstrap = newBootstrap();

                      bootstrap.channel(NioSocketChannel.class);

                      bootstrap.group(group);

                      bootstrap.handler(newChannelInitializer<SocketChannel>() {

                          @Override

                          protectedvoidinitChannel(SocketChannel ch) throwsException {

                              ch.pipeline().addLast(MESSAGE_CODEC);

                          }

                      });

                      Channel channel = bootstrap.connect("localhost", 8888).sync().channel();

                      channel.closeFuture().sync();

                  } catch(Exception e) {

                      System.out.println("客戶端出現(xiàn)錯(cuò)誤:"+ e);

                  } finally{

                      group.shutdownGracefully();

                  }

              }

          }

          目前僅僅只是與服務(wù)端建立了連接,然后裝載了一個(gè)自定義的編解碼器,到這里就搭建了最基本的服務(wù)端、客戶端的基礎(chǔ)架構(gòu),接著來基于它實(shí)現(xiàn)簡單的登錄功能。

          8.3用戶登錄功能的實(shí)現(xiàn)

          對于登錄功能,由于需要在服務(wù)端與客戶端之間傳輸數(shù)據(jù),因此咱們可以設(shè)計(jì)一個(gè)消息對象,但由于后續(xù)單聊、群聊都需要發(fā)送不同的消息格式,因此先設(shè)計(jì)出一個(gè)父類。

          如下:

          publicabstractclassMessage implementsSerializable {

           

              privateintsequenceId;

              privateintmessageType;

           

           

              @Override

              publicString toString() {

                  return"Message{"+

                          "sequenceId="+ sequenceId +

                          ", messageType="+ messageType +

                          '}';

              }

           

              publicintgetSequenceId() {

                  returnsequenceId;

              }

           

              publicvoidsetSequenceId(intsequenceId) {

                  this.sequenceId = sequenceId;

              }

           

              publicvoidsetMessageType(intmessageType) {

                  this.messageType = messageType;

              }

           

              publicabstractintgetMessageType();

           

              publicstaticfinalintLoginRequestMessage = 0;

              publicstaticfinalintLoginResponseMessage = 1;

              publicstaticfinalintChatRequestMessage = 2;

              publicstaticfinalintChatResponseMessage = 3;

              publicstaticfinalintGroupCreateRequestMessage = 4;

              publicstaticfinalintGroupCreateResponseMessage = 5;

              publicstaticfinalintGroupJoinRequestMessage = 6;

              publicstaticfinalintGroupJoinResponseMessage = 7;

              publicstaticfinalintGroupQuitRequestMessage = 8;

              publicstaticfinalintGroupQuitResponseMessage = 9;

              publicstaticfinalintGroupChatRequestMessage = 10;

              publicstaticfinalintGroupChatResponseMessage = 11;

              publicstaticfinalintGroupMembersRequestMessage = 12;

              publicstaticfinalintGroupMembersResponseMessage = 13;

              publicstaticfinalintPingMessage = 14;

              publicstaticfinalintPongMessage = 15;

          }

          在這個(gè)消息父類中,定義了多種消息類型的狀態(tài)碼,不同的消息類型對應(yīng)不同數(shù)字,同時(shí)其中還設(shè)計(jì)了一個(gè)抽象方法,即getMessageType(),該方法交給具體的子類實(shí)現(xiàn),每個(gè)子類返回各自的消息類型,為了方便后續(xù)拓展,這里又創(chuàng)建了一個(gè)抽象類作為中間類。

          如下:

          publicabstractclassAbstractResponseMessage extendsMessage {

              privatebooleansuccess;

              privateString reason;

           

              publicAbstractResponseMessage() {

              }

           

              publicAbstractResponseMessage(booleansuccess, String reason) {

                  this.success = success;

                  this.reason = reason;

              }

           

              @Override

              publicString toString() {

                  return"AbstractResponseMessage{"+

                          "success="+ success +

                          ", reason='"+ reason + '\''+

                          '}';

              }

           

              publicbooleanisSuccess() {

                  returnsuccess;

              }

           

              publicvoidsetSuccess(booleansuccess) {

                  this.success = success;

              }

           

              publicString getReason() {

                  returnreason;

              }

           

              publicvoidsetReason(String reason) {

                  this.reason = reason;

              }

          }

          這個(gè)類主要是提供給響應(yīng)時(shí)使用的,其中包含了響應(yīng)狀態(tài)以及響應(yīng)信息,接著再設(shè)計(jì)兩個(gè)登錄時(shí)會用到的消息對象。

          如下:

          publicclassLoginRequestMessage extendsMessage {

              privateString username;

              privateString password;

           

              publicLoginRequestMessage() {

              }

           

              @Override

              publicString toString() {

                  return"LoginRequestMessage{"+

                          "username='"+ username + '\''+

                          ", password='"+ password + '\''+

                          '}';

              }

           

              publicString getUsername() {

                  returnusername;

              }

           

              publicvoidsetUsername(String username) {

                  this.username = username;

              }

           

              publicString getPassword() {

                  returnpassword;

              }

           

              publicvoidsetPassword(String password) {

                  this.password = password;

              }

           

              publicLoginRequestMessage(String username, String password) {

                  this.username = username;

                  this.password = password;

              }

           

              @Override

              publicintgetMessageType() {

                  returnLoginRequestMessage;

              }

          }

          上述這個(gè)消息類,主要是提供給客戶端登錄時(shí)使用,本質(zhì)上也就是一個(gè)涵蓋用戶名、用戶密碼的對象而已,同時(shí)還有一個(gè)用來給服務(wù)端響應(yīng)時(shí)的響應(yīng)類。

          如下:

          publicclassLoginResponseMessage extendsAbstractResponseMessage {

              publicLoginResponseMessage(booleansuccess, String reason) {

                  super(success, reason);

              }

           

              @Override

              publicintgetMessageType() {

                  returnLoginResponseMessage;

              }

          }

          登錄響應(yīng)類的實(shí)現(xiàn)十分簡單,由登錄狀態(tài)和登錄消息組成,OK,接著來看看登錄的具體實(shí)現(xiàn)。

          首先在客戶端中,再通過pipeline添加一個(gè)處理器,如下:

          CountDownLatch WAIT_FOR_LOGIN = newCountDownLatch(1);

          AtomicBoolean LOGIN = newAtomicBoolean(false);

          AtomicBoolean EXIT = newAtomicBoolean(false);

          Scanner scanner = newScanner(System.in);

           

          ch.pipeline().addLast("client handler", newChannelInboundHandlerAdapter() {

              @Override

              publicvoidchannelActive(ChannelHandlerContext ctx) throwsException {

                  // 負(fù)責(zé)接收用戶在控制臺的輸入,負(fù)責(zé)向服務(wù)器發(fā)送各種消息

                  newThread(() -> {

                      System.out.println("請輸入用戶名:");

                      String username = scanner.nextLine();

                      if(EXIT.get()){

                          return;

                      }

                      System.out.println("請輸入密碼:");

                      String password = scanner.nextLine();

                      if(EXIT.get()){

                          return;

                      }

                      // 構(gòu)造消息對象

                      LoginRequestMessage message = newLoginRequestMessage(username, password);

                      System.out.println(message);

                      // 發(fā)送消息

                      ctx.writeAndFlush(message);

                      System.out.println("等待后續(xù)操作...");

                      try{

                          WAIT_FOR_LOGIN.await();

                      } catch(InterruptedException e) {

                          e.printStackTrace();

                      }

                      // 如果登錄失敗

                      if(!LOGIN.get()) {

                          ctx.channel().close();

                          return;

                      }

              }).start();

          }

          在與服務(wù)端建立連接成功之后,就提示用戶需要登錄,接著接收用戶輸入的用戶名、密碼,然后構(gòu)建出一個(gè)LoginRequestMessage消息對象,接著將其發(fā)送給服務(wù)端,由于前面裝載了自定義的協(xié)議編解碼器,所以消息在出站時(shí),這個(gè)Message對象會被序列化成字節(jié)碼,接著再服務(wù)端入站時(shí),又會被反序列化成消息對象,接著來看看服務(wù)端的實(shí)現(xiàn)。

          如下:

          @ChannelHandler.Sharable

          publicclassLoginRequestMessageHandler

                      extendsSimpleChannelInboundHandler<LoginRequestMessage> {

              @Override

              protectedvoidchannelRead0(ChannelHandlerContext ctx,

                          LoginRequestMessage msg) throwsException {

                  String username = msg.getUsername();

                  String password = msg.getPassword();

                  booleanlogin = UserServiceFactory.getUserService().login(username, password);

                  LoginResponseMessage message;

                  if(login) {

                      SessionFactory.getSession().bind(ctx.channel(), username);

                      message = newLoginResponseMessage(true, "登錄成功");

                  } else{

                      message = newLoginResponseMessage(false, "用戶名或密碼不正確");

                  }

                  ctx.writeAndFlush(message);

              }

          }

          在服務(wù)端中,新增了一個(gè)處理器類,繼承自SimpleChannelInboundHandler這個(gè)處理器,其中指定的泛型為LoginRequestMessage,這表示當(dāng)前處理器只關(guān)注這個(gè)類型的消息,當(dāng)出現(xiàn)登錄類型的消息時(shí),會進(jìn)入該處理器并觸發(fā)內(nèi)部的channelRead0()方法。

          在該方法中,獲取了登錄消息中的用戶名、密碼,接著對其做了基本的登錄效驗(yàn),如果用戶名存在并且密碼正確,就會返回登錄成功,否則會返回登錄失敗,最終登錄后的狀態(tài)會被封裝成一個(gè)LoginResponseMessage對象,然后寫回客戶端的通道中。

          當(dāng)然,為了該處理器能夠成功生效,這里需要將其裝載到服務(wù)端的pipeline上。

          如下:

          LoginRequestMessageHandler LOGIN_HANDLER = newLoginRequestMessageHandler();

          ch.pipeline().addLast(LOGIN_HANDLER);

          裝載好登錄處理器后,接著分別啟動(dòng)服務(wù)端、客戶端,測試結(jié)果如下:

          從圖中的效果來看,這里實(shí)現(xiàn)了最基本的登錄功能,估計(jì)有些小伙伴看到這里就有些暈了,但其實(shí)非常簡單,僅僅只是通過Netty在做數(shù)據(jù)交互而已,客戶端則提供輸入用戶名、密碼的功能,然后將用戶輸入的名稱、密碼發(fā)送給服務(wù)端,服務(wù)端提供登錄判斷的功能,最終根據(jù)判斷結(jié)果再向客戶端返回?cái)?shù)據(jù)罷了。

          9、實(shí)戰(zhàn)要點(diǎn)2:實(shí)現(xiàn)點(diǎn)對點(diǎn)單聊

          9.1概述

          有了基本的用戶登錄功能后,接著來看看如何實(shí)現(xiàn)點(diǎn)對點(diǎn)的單聊功能呢?

          首先我定義了一個(gè)會話接口,如下:

          publicinterfaceSession {

              voidbind(Channel channel, String username);

              voidunbind(Channel channel);

              Channel getChannel(String username);

          }

          這個(gè)接口中依舊只有三個(gè)方法,釋義如下:

          • 1)bind():傳入一個(gè)用戶名和Socket通道,讓兩者之間的產(chǎn)生綁定關(guān)系;
          • 2)unbind():取消一個(gè)用戶與某個(gè)Socket通道的綁定關(guān)系;
          • 3)getChannel():根據(jù)一個(gè)用戶名,獲取與其存在綁定關(guān)系的通道。

          該接口的實(shí)現(xiàn)類如下:

          publicclassSessionMemoryImpl implementsSession {

           

              privatefinalMap<String, Channel> usernameChannelMap = newConcurrentHashMap<>();

              privatefinalMap<Channel, String> channelUsernameMap = newConcurrentHashMap<>();

           

              @Override

              publicvoidbind(Channel channel, String username) {

                  usernameChannelMap.put(username, channel);

                  channelUsernameMap.put(channel, username);

                  channelAttributesMap.put(channel, newConcurrentHashMap<>());

              }

           

              @Override

              publicvoidunbind(Channel channel) {

                  String username = channelUsernameMap.remove(channel);

                  usernameChannelMap.remove(username);

                  channelAttributesMap.remove(channel);

              }

           

              @Override

              publicChannel getChannel(String username) {

                  returnusernameChannelMap.get(username);

              }

           

              @Override

              publicString toString() {

                  returnusernameChannelMap.toString();

              }

          }

          該實(shí)現(xiàn)類最關(guān)鍵的是其中的兩個(gè)Map容器,usernameChannelMap用來存儲所有用戶名與Socket通道的綁定關(guān)系,而channelUsernameMap則是反過來的順序,這主要是為了方便,即可以通過用戶名獲得對應(yīng)通道,也可以通過通道判斷出用戶名,實(shí)際上一個(gè)Map也能搞定,但還是那句話,主要為了簡單嘛~

          有了上述這個(gè)最簡單的會話管理功能后,就要著手實(shí)現(xiàn)具體的功能了,其實(shí)在前面實(shí)現(xiàn)登錄功能的時(shí)候,就用過這其中的bind()方法,也就是當(dāng)?shù)卿洺晒χ螅蜁?dāng)前發(fā)送登錄消息的通道,與正在登錄的用戶名產(chǎn)生綁定關(guān)系,這樣就方便后續(xù)實(shí)現(xiàn)單聊、群聊的功能。

          9.2定義單聊的消息對象

          與登錄時(shí)相同,由于需要在服務(wù)端和客戶端之間實(shí)現(xiàn)數(shù)據(jù)的轉(zhuǎn)發(fā),因此這里也需要兩個(gè)消息對象,用來作為數(shù)據(jù)交互的消息格式。

          如下:

          publicclassChatRequestMessage extendsMessage {

              privateString content;

              privateString to;

              privateString from;

           

              publicChatRequestMessage() {

              }

           

              publicChatRequestMessage(String from, String to, String content) {

                  this.from = from;

                  this.to = to;

                  this.content = content;

              }

              // 省略Get/Setting、toString()方法.....

          }

          上述這個(gè)類,是提供給客戶端用來發(fā)送消息數(shù)據(jù)的,其中主要包含了三個(gè)值,聊天的消息內(nèi)容、發(fā)送人與接收人。因?yàn)檫@里是需要實(shí)現(xiàn)一個(gè)IM聊天程序,所以并不是客戶端與服務(wù)端進(jìn)行數(shù)據(jù)交互,而是客戶端與客戶端之間進(jìn)行數(shù)據(jù)交互,服務(wù)端僅僅只提供消息轉(zhuǎn)發(fā)的功能,接著再構(gòu)建一個(gè)消息類。

          如下:

          publicclassChatResponseMessage extendsAbstractResponseMessage {

           

              privateString from;

              privateString content;

           

              @Override

              publicString toString() {

                  return"ChatResponseMessage{"+

                          "from='"+ from + '\''+

                          ", content='"+ content + '\''+

                          '}';

              }

           

              publicChatResponseMessage(booleansuccess, String reason) {

                  super(success, reason);

              }

           

              publicChatResponseMessage(String from, String content) {

                  this.from = from;

                  this.content = content;

              }

           

              @Override

              publicintgetMessageType() {

                  returnChatResponseMessage;

              }

              // 省略Get/Setting、toString()方法.....

          }

          這個(gè)類是提供給服務(wù)端用來轉(zhuǎn)發(fā)的,當(dāng)服務(wù)端收到一個(gè)聊天消息后,因?yàn)榱奶煜⒅邪私邮杖耍钥梢韵雀鶕?jù)接收人的用戶名,找到對應(yīng)的客戶端通道,然后再封裝成一個(gè)響應(yīng)消息,轉(zhuǎn)發(fā)給對應(yīng)的客戶端即可,下面來做具體實(shí)現(xiàn)。

          9.3實(shí)現(xiàn)點(diǎn)對點(diǎn)單聊功能

          由于聊天功能是提供給客戶端使用的,所以當(dāng)一個(gè)客戶端登錄成功之后,應(yīng)該暴露給用戶一個(gè)操作菜單,所以直接在原本客戶端的channelActive()方法中,登錄成功之后繼續(xù)加代碼即可。

          代碼如下:

          while(true) {

              System.out.println("==================================");

              System.out.println("\t1、發(fā)送單聊消息");

              System.out.println("\t2、發(fā)送群聊消息");

              System.out.println("\t3、創(chuàng)建一個(gè)群聊");

              System.out.println("\t4、獲取群聊成員");

              System.out.println("\t5、加入一個(gè)群聊");

              System.out.println("\t6、退出一個(gè)群聊");

              System.out.println("\t7、退出聊天系統(tǒng)");

              System.out.println("==================================");

              String command = scanner.nextLine();

          }

          首先會開啟一個(gè)死循環(huán),然后不斷接收用戶的操作,接著使用switch語法來對具體的菜單功能進(jìn)行實(shí)現(xiàn),先實(shí)現(xiàn)單聊功能。

          如下:

          switch(command){

              case"1":

                  System.out.print("請選擇你要發(fā)送消息給誰:");

                  String toUserName = scanner.nextLine();

                  System.out.print("請輸入你要發(fā)送的消息內(nèi)容:");

                  String content = scanner.nextLine();

                  ctx.writeAndFlush(newChatRequestMessage(username, toUserName, content));

                  break;

          }

          如果用戶選擇了單聊,接著會提示用戶選擇要發(fā)送消息給誰,這里也就是讓用戶輸入對方的用戶名,實(shí)際上如果有界面的話,這一步是并不需要用戶自己輸入的,而是提供窗口讓用戶點(diǎn)擊,比如QQ、微信一樣,想要給某個(gè)人發(fā)送消息時(shí),只需要點(diǎn)擊“他”的頭像私聊即可。

          等用戶選擇了聊天目標(biāo),并且輸入了消息內(nèi)容后,接著會構(gòu)建一個(gè)ChatRequestMessage消息對象,然后會發(fā)送給服務(wù)端,但這里先不看服務(wù)端的實(shí)現(xiàn),客戶端這邊還需要重寫一個(gè)方法。

          如下:

          @Override

          publicvoidchannelRead(ChannelHandlerContext ctx, Object msg) throwsException {

              System.out.println("收到消息:"+ msg);

              if((msg instanceofLoginResponseMessage)) {

                  LoginResponseMessage response = (LoginResponseMessage) msg;

                  if(response.isSuccess()) {

                      // 如果登錄成功

                      LOGIN.set(true);

                  }

                  // 喚醒 system in 線程

                  WAIT_FOR_LOGIN.countDown();

              }

          }

          前面的邏輯是在channelActive()方法中完成的,也就是連接建立成功后,就會讓用戶登錄,接著登錄成功之后會給用戶一個(gè)菜單欄,提供給用戶進(jìn)行操作,但前面的邏輯中一直沒有對服務(wù)端響應(yīng)的消息進(jìn)行處理,因此channelRead()方法中會對服務(wù)端響應(yīng)的數(shù)據(jù)進(jìn)行處理。

          channelRead()方法會在有數(shù)據(jù)可讀時(shí)被觸發(fā),所以當(dāng)服務(wù)端響應(yīng)數(shù)據(jù)時(shí),首先會判斷一下:目前服務(wù)端響應(yīng)的是不是登錄消息,如果是的話,則需要根據(jù)登錄的結(jié)果來喚醒前面channelActive()方法中的線程。如果目前服務(wù)端響應(yīng)的不是登錄消息,這也就意味著客戶端前面已經(jīng)登錄成功了,所以接著會直接打印一下收到的數(shù)據(jù)。

          OK,有了上述客戶端的代碼實(shí)現(xiàn)后,接著再來服務(wù)端多創(chuàng)建一個(gè)處理器。

          如下:

          @ChannelHandler.Sharable

          publicclassChatRequestMessageHandler

                      extendsSimpleChannelInboundHandler<ChatRequestMessage> {

              @Override

              protectedvoidchannelRead0(ChannelHandlerContext ctx,

                              ChatRequestMessage msg) throwsException {

                  String to = msg.getTo();

                  Channel channel = SessionFactory.getSession().getChannel(to);

                  // 在線

                  if(channel != null) {

                      channel.writeAndFlush(newChatResponseMessage(

                                  msg.getFrom(), msg.getContent()));

                  }

                  // 不在線

                  else{

                      ctx.writeAndFlush(newChatResponseMessage(

                                  false, "對方用戶不存在或者不在線"));

                  }

              }

          }

          這里依舊通過繼承SimpleChannelInboundHandler類的形式,來特別關(guān)注ChatRequestMessage單聊類型的消息,如果目前服務(wù)端收到的是單聊消息,則會進(jìn)入觸發(fā)該處理器的channelRead0()方法。

          該處理器內(nèi)部的邏輯也并不復(fù)雜,首先根據(jù)單聊消息的接收人,去找一下與之對應(yīng)的通道:

          • 1)如果根據(jù)用戶名查到了通道,表示接收人目前是登錄在線狀態(tài);
          • 2)反之,如果無法根據(jù)用戶名找到通道,表示對應(yīng)的用戶不存在或者沒有登錄。

          接著會根據(jù)上面的查詢結(jié)果,進(jìn)行對應(yīng)的結(jié)果返回:

          • 1)如果在線:把要發(fā)送的單聊消息,直接寫入至找到的通道中;
          • 2)如果不在線:向發(fā)送單聊消息的客戶端,返回用戶不存在或用戶不在線。

          有了這個(gè)處理器之后,接著還需要把該處理器裝載到服務(wù)端上,如下:

          ChatRequestMessageHandler CHAT_HANDLER = newChatRequestMessageHandler();

          ch.pipeline().addLast(CHAT_HANDLER);

          裝載好單聊處理器后,接著分別啟動(dòng)一個(gè)服務(wù)端、兩個(gè)客戶端,測試結(jié)果如下:

          從測試結(jié)果中可以明顯看出效果,其中的單聊功能的確已經(jīng)實(shí)現(xiàn),可以實(shí)現(xiàn)A→B用戶之間的單聊功能,兩者之間借助服務(wù)器轉(zhuǎn)發(fā),可以實(shí)現(xiàn)兩人私聊的功能。

          10、實(shí)戰(zhàn)要點(diǎn)3:打造多人聊天室

          10.1概述

          前面實(shí)現(xiàn)了兩個(gè)用戶之間的私聊功能,接著再來實(shí)現(xiàn)一個(gè)多人聊天室的功能,畢竟像QQ、微信、釘釘....等任何通訊軟件,都支持多人建立群聊的功能。

          但多人聊天室的功能,實(shí)現(xiàn)之前還需要先完成建群的功能,畢竟如果群都沒建立,自然無法向某個(gè)群內(nèi)發(fā)送數(shù)據(jù)。

          實(shí)現(xiàn)拉群也好,群聊也罷,其實(shí)現(xiàn)步驟依舊和前面相同,如下:

          • 1)先定義對應(yīng)的消息對象;
          • 2)實(shí)現(xiàn)客戶端發(fā)送對應(yīng)消息數(shù)據(jù)的功能;
          • 3)再寫一個(gè)服務(wù)端的群聊處理器,然后裝載到服務(wù)端上。

          10.2定義拉群的消息體

          首先來定義兩個(gè)拉群時(shí)用的消息體,如下:

          publicclassGroupCreateRequestMessage extendsMessage {

              privateString groupName;

              privateSet<String> members;

           

              publicGroupCreateRequestMessage(String groupName, Set<String> members) {

                  this.groupName = groupName;

                  this.members = members;

              }

           

              @Override

              publicintgetMessageType() {

                  returnGroupCreateRequestMessage;

              }

           

              // 省略其他Get/Settings、toString()方法.....

          }

          上述這個(gè)消息體是提供給客戶端使用的,其中主要存在兩個(gè)成員,也就是群名稱與群成員列表,存放所有群成員的容器選用了Set集合,因?yàn)镾et集合具備不可重復(fù)性,因此可以有效的避免同一用戶多次進(jìn)群,接著再來看看服務(wù)端響應(yīng)時(shí)用的消息體。

          如下:

          publicclassGroupCreateResponseMessage extendsAbstractResponseMessage {

              publicGroupCreateResponseMessage(booleansuccess, String reason) {

                  super(success, reason);

              }

           

              @Override

              publicintgetMessageType() {

                  returnGroupCreateResponseMessage;

              }

          }

          這個(gè)消息體的實(shí)現(xiàn)尤為簡單,僅僅只是給客戶端返回了拉群狀態(tài)以及拉群的附加信息。

          10.3定義群聊會話管理

          前面單聊有單聊的會話管理機(jī)制,而實(shí)現(xiàn)多人群聊時(shí),依舊需要有群聊的會話管理機(jī)制,首先封裝了一個(gè)群聊實(shí)體類。

          如下:

          publicclassGroup {

              // 聊天室名稱

              privateString name;

              // 聊天室成員

              privateSet<String> members;

           

              publicstaticfinalGroup EMPTY_GROUP = newGroup("empty", Collections.emptySet());

           

              publicGroup(String name, Set<String> members) {

                  this.name = name;

                  this.members = members;

              }

           

              // 省略其他Get/Settings、toString()方法.....

          }

          接著定義了一個(gè)群聊會話的頂級接口,如下:

          publicinterfaceGroupSession {

              // 創(chuàng)建一個(gè)群聊

              Group createGroup(String name, Set<String> members);

              // 加入某個(gè)群聊

              Group joinMember(String name, String member);

              // 移除群聊中的某個(gè)成員

              Group removeMember(String name, String member);

              // 解散一個(gè)群聊

              Group removeGroup(String name);

              // 獲取一個(gè)群聊的成員列表

              Set<String> getMembers(String name);

              // 獲取一個(gè)群聊所有在線用戶的Channel通道

              List<Channel> getMembersChannel(String name);

          }

          上述接口中,提供了幾個(gè)接口方法,其實(shí)也主要是群聊系統(tǒng)中的一些日常操作,如創(chuàng)群、加群、踢人、解散群、查看群成員....等功能,接著來看看該接口的實(shí)現(xiàn)者。

          如下:

          publicclassGroupSessionMemoryImpl implementsGroupSession {

              privatefinalMap<String, Group> groupMap = newConcurrentHashMap<>();

           

              @Override

              publicGroup createGroup(String name, Set<String> members) {

                  Group group = newGroup(name, members);

                  returngroupMap.putIfAbsent(name, group);

              }

           

              @Override

              publicGroup joinMember(String name, String member) {

                  returngroupMap.computeIfPresent(name, (key, value) -> {

                      value.getMembers().add(member);

                      returnvalue;

                  });

              }

           

              @Override

              publicGroup removeMember(String name, String member) {

                  returngroupMap.computeIfPresent(name, (key, value) -> {

                      value.getMembers().remove(member);

                      returnvalue;

                  });

              }

           

              @Override

              publicGroup removeGroup(String name) {

                  returngroupMap.remove(name);

              }

           

              @Override

              publicSet<String> getMembers(String name) {

                  returngroupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();

              }

           

              @Override

              publicList<Channel> getMembersChannel(String name) {

                  returngetMembers(name).stream()

                          .map(member -> SessionFactory.getSession().getChannel(member))

                          .filter(Objects::nonNull)

                          .collect(Collectors.toList());

              }

          }

          這個(gè)實(shí)現(xiàn)類沒啥好說的,重點(diǎn)記住里面有個(gè)Map容器即可,這個(gè)容器主要負(fù)責(zé)存儲所有群名稱與Group群聊對象的關(guān)系,后續(xù)可以通過群聊名稱,在這個(gè)容器中找到一個(gè)對應(yīng)群聊對象。同時(shí)為了方便后續(xù)調(diào)用這些接口,還提供了一個(gè)工具類。

          如下:

          publicabstractclassGroupSessionFactory {

              privatestaticGroupSession session = newGroupSessionMemoryImpl();

           

              publicstaticGroupSession getGroupSession() {

                  returnsession;

              }

          }

          很簡單,僅僅只實(shí)例化了一個(gè)群聊會話管理的實(shí)現(xiàn)類,因?yàn)檫@里沒有結(jié)合Spring來實(shí)現(xiàn),所以并不能依靠IOC技術(shù)來自動(dòng)管理Bean,因此咱們需要手動(dòng)創(chuàng)建出一個(gè)實(shí)例,以供于后續(xù)使用。

          10.4實(shí)現(xiàn)拉群功能

          前面客戶端的功能菜單中,3對應(yīng)著拉群功能,所以咱們需要對3做具體的功能實(shí)現(xiàn)。

          邏輯如下:

          case"3":

              System.out.print("請輸入你要?jiǎng)?chuàng)建的群聊昵稱:");

              String newGroupName = scanner.nextLine();

              System.out.print("請選擇你要邀請的群成員(不同成員用、分割):");

              String members = scanner.nextLine();

              Set<String> memberSet = newHashSet<>(Arrays.asList(members.split("、")));

              memberSet.add(username); // 加入自己

              ctx.writeAndFlush(newGroupCreateRequestMessage(newGroupName, memberSet));

              break;

          在該分支實(shí)現(xiàn)中,首先會要求用戶輸入一個(gè)群聊昵稱,接著需要輸入需要拉入群聊的用戶名稱,多個(gè)用戶之間使用、分割,接著會把用戶輸入的群成員以及自己,全部放入到一個(gè)Set集合中,最終組裝成一個(gè)拉群消息體,發(fā)送給服務(wù)端處理。

          服務(wù)端的處理器如下:

          @ChannelHandler.Sharable

          publicclassGroupCreateRequestMessageHandler

                  extendsSimpleChannelInboundHandler<GroupCreateRequestMessage> {

              @Override

              protectedvoidchannelRead0(ChannelHandlerContext ctx,

                          GroupCreateRequestMessage msg) throwsException {

                  String groupName = msg.getGroupName();

                  Set<String> members = msg.getMembers();

                  // 群管理器

                  GroupSession groupSession = GroupSessionFactory.getGroupSession();

                  Group group = groupSession.createGroup(groupName, members);

                  if(group == null) {

                      // 發(fā)生成功消息

                      ctx.writeAndFlush(newGroupCreateResponseMessage(true,

                                          groupName + "創(chuàng)建成功"));

                      // 發(fā)送拉群消息

                      List<Channel> channels = groupSession.getMembersChannel(groupName);

                      for(Channel channel : channels) {

                          channel.writeAndFlush(newGroupCreateResponseMessage(

                                              true, "您已被拉入"+ groupName));

                      }

                  } else{

                      ctx.writeAndFlush(newGroupCreateResponseMessage(

                                          false, groupName + "已經(jīng)存在"));

                  }

              }

          }

          這里依舊繼承了SimpleChannelInboundHandler類,只關(guān)心拉群的消息,當(dāng)客戶端出現(xiàn)拉群消息時(shí),首先會獲取用戶輸入的群昵稱和群成員,接著通過前面提供的創(chuàng)群接口,嘗試創(chuàng)建一個(gè)群聊,如果群聊已經(jīng)存在,則會創(chuàng)建失敗,反之則會創(chuàng)建成功,在創(chuàng)建群聊成功的情況下,會給所有的群成員發(fā)送一條“你已被拉入[XXX]”的消息。

          最后,同樣需要將該處理器裝載到服務(wù)端上,如下:

          GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER =

                              newGroupCreateRequestMessageHandler();

          ch.pipeline().addLast(GROUP_CREATE_HANDLER);

          最后分別啟動(dòng)一個(gè)服務(wù)端、兩個(gè)客戶端進(jìn)行效果測試,如下:

          從上圖的測試結(jié)果來看,的確實(shí)現(xiàn)了咱們的拉群效果,一個(gè)用戶拉群之后,被邀請的成員都會收到來自于服務(wù)端的拉群提醒,這也就為后續(xù)群聊功能奠定了基礎(chǔ)。

          10.5定義群聊的消息體

          這里就不重復(fù)贅述了,還是之前的套路,定義一個(gè)客戶端用的消息體,如下:

          publicclassGroupChatRequestMessage extendsMessage {

              privateString content;

              privateString groupName;

              privateString from;

           

              publicGroupChatRequestMessage(String from, String groupName, String content) {

                  this.content = content;

                  this.groupName = groupName;

                  this.from = from;

              }

           

              @Override

              publicintgetMessageType() {

                  returnGroupChatRequestMessage;

              }

              // 省略其他Get/Settings、toString()方法.....

          }

          這個(gè)是客戶端用來發(fā)送群聊消息的消息體,其中存在三個(gè)成員,發(fā)送人、群聊昵稱、消息內(nèi)容,通過這三個(gè)成員,可以描述清楚任何一條群聊記錄,接著來看看服務(wù)端響應(yīng)時(shí)用的消息體。

          如下:

          publicclassGroupChatResponseMessage extendsAbstractResponseMessage {

              privateString from;

              privateString content;

           

              publicGroupChatResponseMessage(booleansuccess, String reason) {

                  super(success, reason);

              }

           

              publicGroupChatResponseMessage(String from, String content) {

                  this.from = from;

                  this.content = content;

              }

              @Override

              publicintgetMessageType() {

                  returnGroupChatResponseMessage;

              }

              // 省略其他Get/Settings、toString()方法.....

          }

          在這個(gè)消息體中,就省去了群聊昵稱這個(gè)成員,因?yàn)檫@個(gè)消息體的用處,主要是給服務(wù)端轉(zhuǎn)發(fā)給客戶端時(shí)使用的,因此不需要群聊昵稱,當(dāng)然,要也可以,我這里就直接省去了。

          10.6實(shí)現(xiàn)群聊功能

          依舊先來做客戶端的實(shí)現(xiàn),實(shí)現(xiàn)了客戶端之后再去完成服務(wù)端的實(shí)現(xiàn),客戶端實(shí)現(xiàn)如下:

          case"2":

              System.out.print("請選擇你要發(fā)送消息的群聊:");

              String groupName = scanner.nextLine();

              System.out.print("請輸入你要發(fā)送的消息內(nèi)容:");

              String groupContent = scanner.nextLine();

              ctx.writeAndFlush(newGroupChatRequestMessage(username, groupName, groupContent));

              break;

          因?yàn)榘l(fā)送群聊消息對應(yīng)著之前菜單中的2,所以這里對該分支進(jìn)行實(shí)現(xiàn),當(dāng)用戶選擇發(fā)送群聊消息時(shí),首先會讓用戶自己先選擇一個(gè)群聊,接著輸入要發(fā)送的消息內(nèi)容,接著組裝成一個(gè)群聊消息對象,發(fā)送給服務(wù)端處理。

          服務(wù)端的實(shí)現(xiàn)如下:

          @ChannelHandler.Sharable

          publicclassGroupChatRequestMessageHandler

                  extendsSimpleChannelInboundHandler<GroupChatRequestMessage> {

              @Override

              protectedvoidchannelRead0(ChannelHandlerContext ctx,

                          GroupChatRequestMessage msg) throwsException {

                  List<Channel> channels = GroupSessionFactory.getGroupSession()

                          .getMembersChannel(msg.getGroupName());

           

                  for(Channel channel : channels) {

                      channel.writeAndFlush(newGroupChatResponseMessage(

                                      msg.getFrom(), msg.getContent()));

                  }

              }

          }

          這里依舊定義了一個(gè)處理器,關(guān)于原因就不再重復(fù)啰嗦了,服務(wù)端對于群聊消息的實(shí)現(xiàn)額外簡單,也就是先根據(jù)用戶選擇的群昵稱,找到該群所有的群成員,然后依次遍歷成員列表,獲取對應(yīng)的Socket通道,轉(zhuǎn)發(fā)消息即可。

          接著將該處理器裝載到服務(wù)端pipeline上,然后分別啟動(dòng)一個(gè)服務(wù)端、兩個(gè)客戶端,進(jìn)行效果測試,如下:

          效果如上圖的注釋,基于上述的代碼測試,效果確實(shí)達(dá)到了咱們需要的群聊效果~

          10.7聊天室的其他功能實(shí)現(xiàn)

          到這里為止,實(shí)現(xiàn)了最基本的建群、群聊的功能,但對于踢人、加群、解散群....等一系列群聊功能還未曾實(shí)現(xiàn),但我這里就不繼續(xù)重復(fù)了。

          畢竟還是那個(gè)套路:

          • 1)定義對應(yīng)功能的消息體;
          • 2)客戶端向服務(wù)端發(fā)送對應(yīng)格式的消息;
          • 3)服務(wù)端編寫處理器,對特定的消息進(jìn)行處理。

          所以大家感興趣的情況下,可以根據(jù)上述步驟繼續(xù)進(jìn)行實(shí)現(xiàn),實(shí)現(xiàn)的過程沒有任何難度,重點(diǎn)就是時(shí)間問題罷了。

          11、本文小結(jié)

          看到這里,其實(shí)Netty實(shí)戰(zhàn)篇的內(nèi)容也就大致結(jié)束了,個(gè)人對于實(shí)戰(zhàn)篇的內(nèi)容并不怎么滿意,因?yàn)榕c最初設(shè)想的實(shí)現(xiàn)存在很大偏差,這是由于近期工作、生活狀態(tài)不對,所以內(nèi)容輸出也沒那么夯實(shí),對于這篇中的完整代碼實(shí)現(xiàn),也包括前面兩篇中的一些代碼實(shí)現(xiàn)(詳見“2、配套源碼”),大家感興趣可以自行Down下去玩玩。

          在我所撰寫的案例中,自定義協(xié)議可以繼續(xù)優(yōu)化,選擇性能更強(qiáng)的序列化方式,而聊天室也可以進(jìn)一步拓展,比如將用戶信息、群聊信息、聯(lián)系人信息都結(jié)合數(shù)據(jù)庫實(shí)現(xiàn),進(jìn)一步實(shí)現(xiàn)離線消息功能,但由于該案例的設(shè)計(jì)之初就有問題,所以是存在性能問題的,想要打造一款真正高性能的IM程序,那諸位可參考本系列前面的文章即可。

          12、系列文章

          跟著源碼學(xué)IM(一):手把手教你用Netty實(shí)現(xiàn)心跳機(jī)制、斷線重連機(jī)制

          跟著源碼學(xué)IM(二):自已開發(fā)IM很難?手把手教你擼一個(gè)Andriod版IM

          跟著源碼學(xué)IM(三):基于Netty,從零開發(fā)一個(gè)IM服務(wù)端

          跟著源碼學(xué)IM(四):拿起鍵盤就是干,教你徒手開發(fā)一套分布式IM系統(tǒng)

          跟著源碼學(xué)IM(五):正確理解IM長連接、心跳及重連機(jī)制,并動(dòng)手實(shí)現(xiàn)

          跟著源碼學(xué)IM(六):手把手教你用Go快速搭建高性能、可擴(kuò)展的IM系統(tǒng)

          跟著源碼學(xué)IM(七):手把手教你用WebSocket打造Web端IM聊天

          跟著源碼學(xué)IM(八):萬字長文,手把手教你用Netty打造IM聊天

          跟著源碼學(xué)IM(九):基于Netty實(shí)現(xiàn)一套分布式IM系統(tǒng)

          跟著源碼學(xué)IM(十):基于Netty,搭建高性能IM集群(含技術(shù)思路+源碼)

          跟著源碼學(xué)IM(十一):一套基于Netty的分布式高可用IM詳細(xì)設(shè)計(jì)與實(shí)現(xiàn)(有源碼)

          跟著源碼學(xué)IM(十二):基于Netty打造一款高性能的IM即時(shí)通訊程序》(* 本文)

          SpringBoot集成開源IM框架MobileIMSDK,實(shí)現(xiàn)即時(shí)通訊IM聊天功能

          13、參考資料

          [1] 淺談IM系統(tǒng)的架構(gòu)設(shè)計(jì)

          [2] 簡述移動(dòng)端IM開發(fā)的那些坑:架構(gòu)設(shè)計(jì)、通信協(xié)議和客戶端

          [3] 一套海量在線用戶的移動(dòng)端IM架構(gòu)設(shè)計(jì)實(shí)踐分享(含詳細(xì)圖文)

          [4] 一套原創(chuàng)分布式即時(shí)通訊(IM)系統(tǒng)理論架構(gòu)方案

          [5] 一套億級用戶的IM架構(gòu)技術(shù)干貨(上篇):整體架構(gòu)、服務(wù)拆分等

          [6] 一套億級用戶的IM架構(gòu)技術(shù)干貨(下篇):可靠性、有序性、弱網(wǎng)優(yōu)化等

          [7] 史上最通俗Netty框架入門長文:基本介紹、環(huán)境搭建、動(dòng)手實(shí)戰(zhàn)

          [8] 強(qiáng)列建議將Protobuf作為你的即時(shí)通訊應(yīng)用數(shù)據(jù)傳輸格式

          [9] IM通訊協(xié)議專題學(xué)習(xí)(一):Protobuf從入門到精通,一篇就夠!

          [10] 融云技術(shù)分享:全面揭秘億級IM消息的可靠投遞機(jī)制

          [11] IM群聊消息如此復(fù)雜,如何保證不丟不重?

          [12] 零基礎(chǔ)IM開發(fā)入門(四):什么是IM系統(tǒng)的消息時(shí)序一致性?

          [13] 如何保證IM實(shí)時(shí)消息的“時(shí)序性”與“一致性”?

          [14] 微信的海量IM聊天消息序列號生成實(shí)踐(算法原理篇)

          [15] 網(wǎng)易云信技術(shù)分享:IM中的萬人群聊技術(shù)方案實(shí)踐總結(jié)

          [16] 融云IM技術(shù)分享:萬人群聊消息投遞方案的思考和實(shí)踐

          [17] 為何基于TCP協(xié)議的移動(dòng)端IM仍然需要心跳保活機(jī)制?

          [18] 一文讀懂即時(shí)通訊應(yīng)用中的網(wǎng)絡(luò)心跳包機(jī)制:作用、原理、實(shí)現(xiàn)思路等

          [19] 微信團(tuán)隊(duì)原創(chuàng)分享:Android版微信后臺保活實(shí)戰(zhàn)分享(網(wǎng)絡(luò)保活篇)

          [20] 融云技術(shù)分享:融云安卓端IM產(chǎn)品的網(wǎng)絡(luò)鏈路保活技術(shù)實(shí)踐

          [21] 徹底搞懂TCP協(xié)議層的KeepAlive保活機(jī)制

          [22] 深度解密釘釘即時(shí)消息服務(wù)DTIM的技術(shù)設(shè)計(jì)

          (本文已同步發(fā)布于:http://www.52im.net/thread-4530-1-1.html



          作者:Jack Jiang (點(diǎn)擊作者姓名進(jìn)入Github)
          出處:http://www.52im.net/space-uid-1.html
          交流:歡迎加入即時(shí)通訊開發(fā)交流群 215891622
          討論:http://www.52im.net/
          Jack Jiang同時(shí)是【原創(chuàng)Java Swing外觀工程BeautyEye】【輕量級移動(dòng)端即時(shí)通訊框架MobileIMSDK】的作者,可前往下載交流。
          本博文 歡迎轉(zhuǎn)載,轉(zhuǎn)載請注明出處(也可前往 我的52im.net 找到我)。


          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          Jack Jiang的 Mail: jb2011@163.com, 聯(lián)系QQ: 413980957, 微信: hellojackjiang
          主站蜘蛛池模板: 彭州市| 河津市| 偃师市| 湖北省| 新蔡县| 开封市| 新宾| 三门县| 揭东县| 屏东市| 东阳市| 沁阳市| 东莞市| 团风县| 武夷山市| 苍山县| 常德市| 宜丰县| 元江| 贵南县| 荔浦县| 隆化县| 蒙自县| 闽侯县| 康定县| 曲阳县| 新田县| 临潭县| 苏州市| 沅江市| 稻城县| 睢宁县| 什邡市| 乌鲁木齐县| 卢湾区| 宁波市| 铜陵市| 沈阳市| 五常市| 桦南县| 凌海市|