跟著源碼學(xué)IM(十二):基于Netty打造一款高性能的IM即時(shí)通訊程序
Posted on 2023-11-30 12:28 Jack Jiang 閱讀(99) 評論(0) 編輯 收藏本文由竹子愛熊貓分享,原題“(十一)Netty實(shí)戰(zhàn)篇:基于Netty框架打造一款高性能的IM即時(shí)通訊程序”,本文有修訂和改動(dòng)。
1、引言
關(guān)于Netty網(wǎng)絡(luò)框架的內(nèi)容,前面已經(jīng)講了兩個(gè)章節(jié),但總歸來說難以真正掌握,畢竟只是對其中一個(gè)個(gè)組件進(jìn)行講解,很難讓諸位將其串起來形成一條線,所以本章中則會結(jié)合實(shí)戰(zhàn)案例,對Netty進(jìn)行更深層次的學(xué)習(xí)與掌握,實(shí)戰(zhàn)案例也并不難,一個(gè)非常樸素的IM聊天程序。
原本打算做個(gè)多人斗地主練習(xí)程序,但那需要織入過多的業(yè)務(wù)邏輯,因此一方面會帶來不必要的理解難度,讓案例更為復(fù)雜化,另一方面代碼量也會偏多,所以最終依舊選擇實(shí)現(xiàn)基本的IM聊天程序,既簡單,又能加深對Netty的理解。

- 移動(dòng)端IM開發(fā)入門文章:《新手入門一篇就夠:從零開發(fā)移動(dòng)端IM》
- 開源IM框架源碼:https://github.com/JackJiang2011/MobileIMSDK(備用地址點(diǎn)此)
(本文已同步發(fā)布于:http://www.52im.net/thread-4530-1-1.html)
2、配套源碼
本文配套源碼的開源托管地址是:
- 1)主地址:https://github.com/liuhaijieAdmin/springboot-netty
- 2)備地址:https://github.com/52im/springboot-netty
3、知識準(zhǔn)備
關(guān)于 Netty 是什么,這里簡單介紹下:
Netty 是一個(gè) Java 開源框架。Netty 提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。
也就是說,Netty 是一個(gè)基于 NIO 的客戶、服務(wù)器端編程框架,使用Netty 可以確保你快速和簡單的開發(fā)出一個(gè)網(wǎng)絡(luò)應(yīng)用,例如實(shí)現(xiàn)了某種協(xié)議的客戶,服務(wù)端應(yīng)用。
Netty 相當(dāng)簡化和流線化了網(wǎng)絡(luò)應(yīng)用的編程開發(fā)過程,例如,TCP 和 UDP 的 Socket 服務(wù)開發(fā)。
有關(guān)Netty的入門文章:
- 1)新手入門:目前為止最透徹的的Netty高性能原理和框架架構(gòu)解析
- 2)寫給初學(xué)者:Java高性能NIO框架Netty的學(xué)習(xí)方法和進(jìn)階策略
- 3)史上最通俗Netty框架入門長文:基本介紹、環(huán)境搭建、動(dòng)手實(shí)戰(zhàn)
如果你連Java NIO都不知道,下面的文章建議優(yōu)先讀:
- 1)少啰嗦!一分鐘帶你讀懂Java的NIO和經(jīng)典IO的區(qū)別
- 2)史上最強(qiáng)Java NIO入門:擔(dān)心從入門到放棄的,請讀這篇!
- 3)Java的BIO和NIO很難懂?用代碼實(shí)踐給你看,再不懂我轉(zhuǎn)行!
Netty源碼和API 在線查閱地址:
4、基于Netty設(shè)計(jì)通信協(xié)議
協(xié)議,這玩意兒相信大家肯定不陌生了,簡單回顧一下協(xié)議的概念:網(wǎng)絡(luò)協(xié)議是指一種通信雙方都必須遵守的約定,兩個(gè)不同的端,按照一定的格式對數(shù)據(jù)進(jìn)行“編碼”,同時(shí)按照相同的規(guī)則進(jìn)行“解碼”,從而實(shí)現(xiàn)兩者之間的數(shù)據(jù)傳輸與通信。
當(dāng)自己想要打造一款I(lǐng)M通信程序時(shí),對于消息的封裝、拆分也同樣需要設(shè)計(jì)一個(gè)協(xié)議,通信的兩端都必須遵守該協(xié)議工作,這也是實(shí)現(xiàn)通信程序的前提。
但為什么需要通信協(xié)議呢?
因?yàn)門CP/IP中是基于流的方式傳輸消息,消息與消息之間沒有邊界,而協(xié)議的目的則在于約定消息的樣式、邊界等。
5、Redis通信的RESP協(xié)議參考學(xué)習(xí)
不知大家是否還記得之前我聊到的RESP客戶端協(xié)議,這是Redis提供的一種客戶端通信協(xié)議。如果想要操作Redis,就必須遵守該協(xié)議的格式發(fā)送數(shù)據(jù)。
這個(gè)協(xié)議特別簡單,如下:
- 1)首先要求所有命令,都以*開頭,后面跟著具體的子命令數(shù)量,接著用換行符分割;
- 2)接著需要先用$符號聲明每個(gè)子命令的長度,然后再用換行符分割;
- 3)最后再拼接上具體的子命令,同樣用換行符分割。
這樣描述有些令人難懂,那就直接看個(gè)案例,例如一條簡單set命令。
如下:
客戶端命令:
setname ZhuZi
轉(zhuǎn)變?yōu)镽ESP指令:
*3
$3
set
$4
name
$5
ZhuZi
按照Redis的規(guī)定,但凡滿足RESP協(xié)議的客戶端,都可以直接連接并操作Redis服務(wù)端,這也就意味著咱們可以直接通過Netty來手寫一個(gè)Redis客戶端。
代碼如下:
// 基于Netty、RESP協(xié)議實(shí)現(xiàn)的Redis客戶端
publicclassRedisClient {
// 換行符的ASCII碼
staticfinalbyte[] LINE = {13, 10};
publicstaticvoidmain(String[] args) {
EventLoopGroup worker = newNioEventLoopGroup();
Bootstrap client = newBootstrap();
try{
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(newChannelInitializer<SocketChannel>() {
@Override
protectedvoidinitChannel(SocketChannel socketChannel)
throwsException {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(newChannelInboundHandlerAdapter(){
// 通道建立成功后調(diào)用:向Redis發(fā)送一條set命令
@Override
publicvoidchannelActive(ChannelHandlerContext ctx)
throwsException {
String command = "set name ZhuZi";
ByteBuf buffer = respCommand(command);
ctx.channel().writeAndFlush(buffer);
}
// Redis響應(yīng)數(shù)據(jù)時(shí)觸發(fā):打印Redis的響應(yīng)結(jié)果
@Override
publicvoidchannelRead(ChannelHandlerContext ctx,
Object msg) throwsException {
// 接受Redis服務(wù)端執(zhí)行指令后的結(jié)果
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(CharsetUtil.UTF_8));
}
});
}
});
// 根據(jù)IP、端口連接Redis服務(wù)端
client.connect("192.168.12.129", 6379).sync();
} catch(Exception e){
e.printStackTrace();
}
}
privatestaticByteBuf respCommand(String command){
// 先對傳入的命令以空格進(jìn)行分割
String[] commands = command.split(" ");
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// 遵循RESP協(xié)議:先寫入指令的個(gè)數(shù)
buffer.writeBytes(("*"+ commands.length).getBytes());
buffer.writeBytes(LINE);
// 接著分別寫入每個(gè)指令的長度以及具體值
for(String s : commands) {
buffer.writeBytes(("$"+ s.length()).getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes(s.getBytes());
buffer.writeBytes(LINE);
}
// 把轉(zhuǎn)換成RESP格式的命令返回
returnbuffer;
}
}
在上述這個(gè)案例中,也僅僅只是通過respCommand()這個(gè)方法,對用戶輸入的指令進(jìn)行了轉(zhuǎn)換。同時(shí)在上面通過Netty,與Redis的地址、端口建立了連接。在連接建立成功后,就會向Redis發(fā)送一條轉(zhuǎn)換成RESP指令的set命令。接著等待Redis的響應(yīng)結(jié)果并輸出,如下:
+OK
因?yàn)檫@是一條寫指令,所以當(dāng)Redis收到執(zhí)行完成后,最終就會返回一個(gè)OK,大家也可直接去Redis中查詢,也依舊能夠查詢到剛剛寫入的name這個(gè)鍵值。
6、HTTP超文本傳輸協(xié)議參考學(xué)習(xí)
前面咱們自己針對于Redis的RESP協(xié)議,對用戶指令進(jìn)行了封裝,然后發(fā)往Redis執(zhí)行。
但對于這些常用的協(xié)議,Netty早已提供好了現(xiàn)成的處理器,想要使用時(shí)無需從頭開發(fā),可以直接使用現(xiàn)成的處理器來實(shí)現(xiàn)。
比如現(xiàn)在咱們可以基于Netty提供的處理器,實(shí)現(xiàn)一個(gè)簡單的HTTP服務(wù)器。
代碼如下:
// 基于Netty提供的處理器實(shí)現(xiàn)HTTP服務(wù)器
publicclassHttpServer {
publicstaticvoidmain(String[] args) throwsInterruptedException {
EventLoopGroup boss = newNioEventLoopGroup();
EventLoopGroup worker = newNioEventLoopGroup();
ServerBootstrap server = newServerBootstrap();
server
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(newChannelInitializer<NioSocketChannel>() {
@Override
protectedvoidinitChannel(NioSocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加一個(gè)Netty提供的HTTP處理器
pipeline.addLast(newHttpServerCodec());
pipeline.addLast(newChannelInboundHandlerAdapter() {
@Override
publicvoidchannelRead(ChannelHandlerContext ctx,
Object msg) throwsException {
// 在這里輸出一下消息的類型
System.out.println("消息類型:"+ msg.getClass());
super.channelRead(ctx, msg);
}
});
pipeline.addLast(newSimpleChannelInboundHandler<HttpRequest>() {
@Override
protectedvoidchannelRead0(ChannelHandlerContext ctx,
HttpRequest msg) throwsException {
System.out.println("客戶端的請求路徑:"+ msg.uri());
// 創(chuàng)建一個(gè)響應(yīng)對象,版本號與客戶端保持一致,狀態(tài)碼為OK/200
DefaultFullHttpResponse response =
newDefaultFullHttpResponse(
msg.protocolVersion(),
HttpResponseStatus.OK);
// 構(gòu)造響應(yīng)內(nèi)容
byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();
// 設(shè)置響應(yīng)頭:告訴客戶端本次響應(yīng)的數(shù)據(jù)長度
response.headers().setInt(
HttpHeaderNames.CONTENT_LENGTH,content.length);
// 設(shè)置響應(yīng)主體
response.content().writeBytes(content);
// 向客戶端寫入響應(yīng)數(shù)據(jù)
ctx.writeAndFlush(response);
}
});
}
})
.bind("127.0.0.1",8888)
.sync();
}
}
在該案例中,咱們就未曾手動(dòng)對HTTP的數(shù)據(jù)包進(jìn)行拆包處理了,而是在服務(wù)端的pipeline上添加了一個(gè)HttpServerCodec處理器,這個(gè)處理器是Netty官方提供的。
其類繼承關(guān)系如下:
publicfinalclassHttpServerCodec
extendsCombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implementsSourceCodec {
// ......
}
觀察會發(fā)現(xiàn),該類繼承自CombinedChannelDuplexHandler這個(gè)組合類,它組合了編碼器、解碼器。
這也就意味著HttpServerCodec即可以對客戶端的數(shù)據(jù)做解碼,也可以對服務(wù)端響應(yīng)的數(shù)據(jù)做編碼。
同時(shí)除開添加了這個(gè)處理器外,在第二個(gè)處理器中打印了一下客戶端的消息類型,最后一個(gè)處理器中,對客戶端的請求做出了響應(yīng),其實(shí)也就是返回了一句話而已。
此時(shí)在瀏覽器輸入http://127.0.0.1:8888/index.html,結(jié)果如下:
消息類型:classio.netty.handler.codec.http.DefaultHttpRequest
消息類型:classio.netty.handler.codec.http.LastHttpContent$1
客戶端的請求路徑:/index.html
此時(shí)來看結(jié)果,客戶端的請求會被解析成兩個(gè)部分:
- 1)第一個(gè)是請求信息;
- 2)第二個(gè)是主體信息。
但按理來說瀏覽器發(fā)出的請求,屬于GET類型的請求,GET請求是沒有請求體信息的,但Netty依舊會解析成兩部分~,只不過GET請求的第二部分是空的。
在第三個(gè)處理器中,咱們直接向客戶端返回了一個(gè)h1標(biāo)簽,同時(shí)也要記得在響應(yīng)頭里面,加上響應(yīng)內(nèi)容的長度信息,否則瀏覽器的加載圈,會一直不同的轉(zhuǎn)動(dòng),畢竟瀏覽器也不知道內(nèi)容有多長,就會一直反復(fù)加載,嘗試等待更多的數(shù)據(jù)。
7、自定義消息傳輸協(xié)議
7.1概述
Netty除開提供了HTTP協(xié)議的處理器外,還提供了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....一系列協(xié)議的實(shí)現(xiàn),具體定義位于io.netty.handler.codec這個(gè)包下,當(dāng)然,咱們也可以自己實(shí)現(xiàn)自定義協(xié)議,按照自己的邏輯對數(shù)據(jù)進(jìn)行編解碼處理。
很多基于Netty開發(fā)的中間件/組件,其內(nèi)部基本上都開發(fā)了專屬的通信協(xié)議,以此來作為不同節(jié)點(diǎn)間通信的基礎(chǔ),所以解下來咱們基于Netty也來自己設(shè)計(jì)一款通信協(xié)議,這也會作為后續(xù)實(shí)現(xiàn)聊天程序時(shí)的基礎(chǔ)。
所謂的協(xié)議設(shè)計(jì),其實(shí)僅僅只需要按照一定約束,實(shí)現(xiàn)編碼器與解碼器即可,發(fā)送方在發(fā)出數(shù)據(jù)之前,會經(jīng)過編碼器對數(shù)據(jù)進(jìn)行處理,而接收方在收到數(shù)據(jù)之前,則會由解碼器對數(shù)據(jù)進(jìn)行處理。
7.2自定義協(xié)議的要素
在自定義傳輸協(xié)議時(shí),咱們必然需要考慮幾個(gè)因素,如下:
- 1)魔數(shù):用來第一時(shí)間判斷是否為自己需要的數(shù)據(jù)包;
- 2)版本號:提高協(xié)議的拓展性,方便后續(xù)對協(xié)議進(jìn)行升級;
- 3)序列化算法:消息正文具體該使用哪種方式進(jìn)行序列化傳輸,例如Json、ProtoBuf、JDK...;
- 4)消息類型:第一時(shí)間判斷出當(dāng)前消息的類型;
- 5)消息序號:為了實(shí)現(xiàn)雙工通信,客戶端和服務(wù)端之間收/發(fā)消息不會相互阻塞;
- 6)正文長度:提供給LTC解碼器使用,防止解碼時(shí)出現(xiàn)粘包、半包的現(xiàn)象;
- 7)消息正文:本次消息要傳輸?shù)木唧w數(shù)據(jù)。
在設(shè)計(jì)協(xié)議時(shí),一個(gè)完整的協(xié)議應(yīng)該涵蓋上述所說的幾方面,這樣才能提供雙方通信時(shí)的基礎(chǔ)。
基于上述幾個(gè)字段,能夠在第一時(shí)間內(nèi)判斷出:
- 1)消息是否可用;
- 2)當(dāng)前協(xié)議版本;
- 3)消息的具體類型;
- 4)消息的長度等各類信息。
從而給后續(xù)處理器使用(自定義的協(xié)議規(guī)則本身就是一個(gè)編解碼處理器而已)。
7.3自定義協(xié)議實(shí)戰(zhàn)
前面簡單聊到過,所謂的自定義協(xié)議就是自己規(guī)定消息格式,以及自己實(shí)現(xiàn)編/解碼器對消息實(shí)現(xiàn)封裝/拆解,所以這里想要自定義一個(gè)消息協(xié)議,就只需要滿足前面兩個(gè)條件即可。
因此實(shí)現(xiàn)如下:
@ChannelHandler.Sharable
publicclassChatMessageCodec extendsMessageToMessageCodec<ByteBuf, Message> {
// 消息出站時(shí)會經(jīng)過的編碼方法(將原生消息對象封裝成自定義協(xié)議的消息格式)
@Override
protectedvoidencode(ChannelHandlerContext ctx, Message msg,
List<Object> list) throwsException {
ByteBuf outMsg = ctx.alloc().buffer();
// 前五個(gè)字節(jié)作為魔數(shù)
byte[] magicNumber = newbyte[]{'Z','h','u','Z','i'};
outMsg.writeBytes(magicNumber);
// 一個(gè)字節(jié)作為版本號
outMsg.writeByte(1);
// 一個(gè)字節(jié)表示序列化方式 0:JDK、1:Json、2:ProtoBuf.....
outMsg.writeByte(0);
// 一個(gè)字節(jié)用于表示消息類型
outMsg.writeByte(msg.getMessageType());
// 四個(gè)字節(jié)表示消息序號
outMsg.writeInt(msg.getSequenceId());
// 使用Java-Serializable的方式對消息對象進(jìn)行序列化
ByteArrayOutputStream bos = newByteArrayOutputStream();
ObjectOutputStream oos = newObjectOutputStream(bos);
oos.writeObject(msg);
byte[] msgBytes = bos.toByteArray();
// 使用四個(gè)字節(jié)描述消息正文的長度
outMsg.writeInt(msgBytes.length);
// 將序列化后的消息對象作為消息正文
outMsg.writeBytes(msgBytes);
// 將封裝好的數(shù)據(jù)傳遞給下一個(gè)處理器
list.add(outMsg);
}
// 消息入站時(shí)會經(jīng)過的解碼方法(將自定義格式的消息轉(zhuǎn)變?yōu)榫唧w的消息對象)
@Override
protectedvoiddecode(ChannelHandlerContext ctx,
ByteBuf inMsg, List<Object> list) throwsException {
// 讀取前五個(gè)字節(jié)得到魔數(shù)
byte[] magicNumber = newbyte[5];
inMsg.readBytes(magicNumber,0,5);
// 再讀取一個(gè)字節(jié)得到版本號
byteversion = inMsg.readByte();
// 再讀取一個(gè)字節(jié)得到序列化方式
byteserializableType = inMsg.readByte();
// 再讀取一個(gè)字節(jié)得到消息類型
bytemessageType = inMsg.readByte();
// 再讀取四個(gè)字節(jié)得到消息序號
intsequenceId = inMsg.readInt();
// 再讀取四個(gè)字節(jié)得到消息正文長度
intmessageLength = inMsg.readInt();
// 再根據(jù)正文長度讀取序列化后的字節(jié)正文數(shù)據(jù)
byte[] msgBytes = newbyte[messageLength];
inMsg.readBytes(msgBytes,0,messageLength);
// 對于讀取到的消息正文進(jìn)行反序列化,最終得到具體的消息對象
ByteArrayInputStream bis = newByteArrayInputStream(msgBytes);
ObjectInputStream ois = newObjectInputStream(bis);
Message message = (Message) ois.readObject();
// 最終把反序列化得到的消息對象傳遞給后續(xù)的處理器
list.add(message);
}
}
上面自定義的處理器中,繼承了MessageToMessageCodec類,主要負(fù)責(zé)將數(shù)據(jù)在原生ByteBuf與Message之間進(jìn)行相互轉(zhuǎn)換,而Message對象是自定義的消息對象,這里暫且無需過多關(guān)心。
其中主要實(shí)現(xiàn)了兩個(gè)方法:
- 1)encode():出站時(shí)會經(jīng)過的編碼方法,會將原生消息對象按自定義的協(xié)議封裝成對應(yīng)的字節(jié)數(shù)據(jù);
- 2)decode():入站時(shí)會經(jīng)過的解碼方法,會將協(xié)議格式的字節(jié)數(shù)據(jù),轉(zhuǎn)變?yōu)榫唧w的消息對象。
上述自定義的協(xié)議,也就是一定規(guī)則的字節(jié)數(shù)據(jù),每條消息數(shù)據(jù)的組成如下:
- 1)魔數(shù):使用第1~5個(gè)字節(jié)來描述,這個(gè)魔數(shù)值可以按自己的想法自定義;
- 2)版本號:使用第6個(gè)字節(jié)來描述,不同數(shù)字表示不同版本;
- 3)序列化算法:使用第7個(gè)字節(jié)來描述,不同數(shù)字表示不同序列化方式;
- 4)消息類型:使用第8個(gè)字節(jié)來描述,不同的消息類型使用不同數(shù)字表示;
- 5)消息序號:使用第9~12個(gè)字節(jié)來描述,其實(shí)就是一個(gè)四字節(jié)的整數(shù);
- 6)正文長度:使用第13~16個(gè)字節(jié)來描述,也是一個(gè)四字節(jié)的整數(shù);
- 7)消息正文:長度不固定,根據(jù)每次具體發(fā)送的數(shù)據(jù)來決定。
在其中,為了實(shí)現(xiàn)簡單,這里的序列化方式,則采用的是JDK默認(rèn)的Serializable接口方式,但這種方式生成的對象字節(jié)較大,實(shí)際情況中最好還是選擇谷歌的ProtoBuf方式,這種算法屬于序列化算法中,性能最佳的一種落地實(shí)現(xiàn)。
當(dāng)然,這個(gè)自定義的協(xié)議是提供給后續(xù)的聊天業(yè)務(wù)使用的,但這種實(shí)戰(zhàn)型的內(nèi)容分享,基本上代碼量較高,所以大家看起來會有些枯燥,而本文所使用的聊天室案例,是基于《B站-黑馬Netty視頻教程》二次改良的,因此如若感覺文字描述較為枯燥,可直接點(diǎn)擊前面給出的鏈接,觀看P101~P121視頻進(jìn)行學(xué)習(xí)。
最后來觀察一下,大家會發(fā)現(xiàn),在咱們定義的這個(gè)協(xié)議編解碼處理器上,存在著一個(gè)@ChannelHandler.Sharable注解,這個(gè)注解的作用是干嗎的呢?其實(shí)很簡單,用來標(biāo)識當(dāng)前處理器是否可在多線程環(huán)境下使用,如果帶有該注解的處理器,則表示可以在多個(gè)通道間共用,因此只需要?jiǎng)?chuàng)建一個(gè)即可,反之同理,如果不帶有該注解的處理器,則每個(gè)通道需要單獨(dú)創(chuàng)建使用。
PS:如果你想系統(tǒng)學(xué)習(xí)Protobuf,可以從以下文章入手:
《如何選擇即時(shí)通訊應(yīng)用的數(shù)據(jù)傳輸格式》
《強(qiáng)列建議將Protobuf作為你的即時(shí)通訊應(yīng)用數(shù)據(jù)傳輸格式》
《IM通訊協(xié)議專題學(xué)習(xí)(一):Protobuf從入門到精通,一篇就夠!》
《IM通訊協(xié)議專題學(xué)習(xí)(二):快速理解Protobuf的背景、原理、使用、優(yōu)缺點(diǎn)》
《IM通訊協(xié)議專題學(xué)習(xí)(三):由淺入深,從根上理解Protobuf的編解碼原理》
《IM通訊協(xié)議專題學(xué)習(xí)(四):從Base64到Protobuf,詳解Protobuf的數(shù)據(jù)編碼原理》
《IM通訊協(xié)議專題學(xué)習(xí)(八):金蝶隨手記團(tuán)隊(duì)的Protobuf應(yīng)用實(shí)踐(原理篇)》
8、實(shí)戰(zhàn)要點(diǎn)1:IM程序的用戶模塊
8.1概述
聊天、聊天,自然是得先有人,然后才能進(jìn)行聊天溝通。與QQ、微信類似,如果你想要使用某款聊天程序時(shí),前提都得是先具備一個(gè)對應(yīng)的賬戶才行。
因此在咱們設(shè)計(jì)IM系統(tǒng)之處,那也需要對應(yīng)的用戶功能實(shí)現(xiàn)。但這里為了簡單,同樣不再結(jié)合數(shù)據(jù)庫實(shí)現(xiàn)完整的用戶模塊了,而是基于內(nèi)存實(shí)現(xiàn)用戶的管理。
如下:
publicinterfaceUserService {
booleanlogin(String username, String password);
}
這是用戶模塊的頂層接口,僅僅只提供了一個(gè)登錄接口,關(guān)于注冊、鑒權(quán)、等級.....等一系列功能,大家感興趣的可在后續(xù)進(jìn)行拓展實(shí)現(xiàn),接著來看看該接口的實(shí)現(xiàn)類。
如下:
publicclassUserServiceMemoryImpl implementsUserService {
privateMap<String, String> allUserMap = newConcurrentHashMap<>();
{
// 在代碼塊中對用戶列表進(jìn)行初始化,向其中添加了兩個(gè)用戶信息
allUserMap.put("ZhuZi", "123");
allUserMap.put("XiongMao", "123");
}
@Override
publicbooleanlogin(String username, String password) {
String pass = allUserMap.get(username);
if(pass == null) {
returnfalse;
}
returnpass.equals(password);
}
}
這個(gè)實(shí)現(xiàn)類并未結(jié)合數(shù)據(jù)庫來實(shí)現(xiàn),而是僅僅在程序啟動(dòng)時(shí),通過代碼塊的方式,加載了ZhuZi、XiongMao兩個(gè)用戶信息并放入內(nèi)存的Map容器中,這里有興趣的小伙伴,可自行將Map容器換成數(shù)據(jù)庫的表即可。
其中實(shí)現(xiàn)的login()登錄接口尤為簡單,僅僅只是判斷了一下有沒有對應(yīng)用戶,如果有的話則看看密碼是否正確,正確返回true,密碼錯(cuò)誤則返回false。是的,我所寫的登錄功能就是這么簡單,走個(gè)簡單的過場,哈哈哈~
8.2服務(wù)端、客戶端的基礎(chǔ)架構(gòu)
基本的用戶模塊有了,但這里還未曾套入具體實(shí)現(xiàn),因此先簡單的搭建出服務(wù)端、客戶端的架構(gòu),然后再基于構(gòu)建好的架構(gòu)實(shí)現(xiàn)基礎(chǔ)的用戶登錄功能。
服務(wù)端的基礎(chǔ)搭建如下:
publicclassChatServer {
publicstaticvoidmain(String[] args) {
NioEventLoopGroup boss = newNioEventLoopGroup();
NioEventLoopGroup worker = newNioEventLoopGroup();
ChatMessageCodec MESSAGE_CODEC = newChatMessageCodec();
try{
ServerBootstrap serverBootstrap = newServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(newChannelInitializer<SocketChannel>() {
@Override
protectedvoidinitChannel(SocketChannel ch) throwsException {
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
Channel channel = serverBootstrap.bind(8888).sync().channel();
channel.closeFuture().sync();
} catch(InterruptedException e) {
System.out.println("服務(wù)端出現(xiàn)錯(cuò)誤:"+ e);
} finally{
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
服務(wù)端的代碼目前很簡單,僅僅只是裝載了一個(gè)自己的協(xié)議編/解碼處理器,然后就是一些老步驟,不再過多的重復(fù)贅述,接著再來搭建一個(gè)簡單的客戶端。
代碼實(shí)現(xiàn)如下:
publicclassChatClient {
publicstaticvoidmain(String[] args) {
NioEventLoopGroup group = newNioEventLoopGroup();
ChatMessageCodec MESSAGE_CODEC = newChatMessageCodec();
try{
Bootstrap bootstrap = newBootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(newChannelInitializer<SocketChannel>() {
@Override
protectedvoidinitChannel(SocketChannel ch) throwsException {
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
Channel channel = bootstrap.connect("localhost", 8888).sync().channel();
channel.closeFuture().sync();
} catch(Exception e) {
System.out.println("客戶端出現(xiàn)錯(cuò)誤:"+ e);
} finally{
group.shutdownGracefully();
}
}
}
目前僅僅只是與服務(wù)端建立了連接,然后裝載了一個(gè)自定義的編解碼器,到這里就搭建了最基本的服務(wù)端、客戶端的基礎(chǔ)架構(gòu),接著來基于它實(shí)現(xiàn)簡單的登錄功能。
8.3用戶登錄功能的實(shí)現(xiàn)
對于登錄功能,由于需要在服務(wù)端與客戶端之間傳輸數(shù)據(jù),因此咱們可以設(shè)計(jì)一個(gè)消息對象,但由于后續(xù)單聊、群聊都需要發(fā)送不同的消息格式,因此先設(shè)計(jì)出一個(gè)父類。
如下:
publicabstractclassMessage implementsSerializable {
privateintsequenceId;
privateintmessageType;
@Override
publicString toString() {
return"Message{"+
"sequenceId="+ sequenceId +
", messageType="+ messageType +
'}';
}
publicintgetSequenceId() {
returnsequenceId;
}
publicvoidsetSequenceId(intsequenceId) {
this.sequenceId = sequenceId;
}
publicvoidsetMessageType(intmessageType) {
this.messageType = messageType;
}
publicabstractintgetMessageType();
publicstaticfinalintLoginRequestMessage = 0;
publicstaticfinalintLoginResponseMessage = 1;
publicstaticfinalintChatRequestMessage = 2;
publicstaticfinalintChatResponseMessage = 3;
publicstaticfinalintGroupCreateRequestMessage = 4;
publicstaticfinalintGroupCreateResponseMessage = 5;
publicstaticfinalintGroupJoinRequestMessage = 6;
publicstaticfinalintGroupJoinResponseMessage = 7;
publicstaticfinalintGroupQuitRequestMessage = 8;
publicstaticfinalintGroupQuitResponseMessage = 9;
publicstaticfinalintGroupChatRequestMessage = 10;
publicstaticfinalintGroupChatResponseMessage = 11;
publicstaticfinalintGroupMembersRequestMessage = 12;
publicstaticfinalintGroupMembersResponseMessage = 13;
publicstaticfinalintPingMessage = 14;
publicstaticfinalintPongMessage = 15;
}
在這個(gè)消息父類中,定義了多種消息類型的狀態(tài)碼,不同的消息類型對應(yīng)不同數(shù)字,同時(shí)其中還設(shè)計(jì)了一個(gè)抽象方法,即getMessageType(),該方法交給具體的子類實(shí)現(xiàn),每個(gè)子類返回各自的消息類型,為了方便后續(xù)拓展,這里又創(chuàng)建了一個(gè)抽象類作為中間類。
如下:
publicabstractclassAbstractResponseMessage extendsMessage {
privatebooleansuccess;
privateString reason;
publicAbstractResponseMessage() {
}
publicAbstractResponseMessage(booleansuccess, String reason) {
this.success = success;
this.reason = reason;
}
@Override
publicString toString() {
return"AbstractResponseMessage{"+
"success="+ success +
", reason='"+ reason + '\''+
'}';
}
publicbooleanisSuccess() {
returnsuccess;
}
publicvoidsetSuccess(booleansuccess) {
this.success = success;
}
publicString getReason() {
returnreason;
}
publicvoidsetReason(String reason) {
this.reason = reason;
}
}
這個(gè)類主要是提供給響應(yīng)時(shí)使用的,其中包含了響應(yīng)狀態(tài)以及響應(yīng)信息,接著再設(shè)計(jì)兩個(gè)登錄時(shí)會用到的消息對象。
如下:
publicclassLoginRequestMessage extendsMessage {
privateString username;
privateString password;
publicLoginRequestMessage() {
}
@Override
publicString toString() {
return"LoginRequestMessage{"+
"username='"+ username + '\''+
", password='"+ password + '\''+
'}';
}
publicString getUsername() {
returnusername;
}
publicvoidsetUsername(String username) {
this.username = username;
}
publicString getPassword() {
returnpassword;
}
publicvoidsetPassword(String password) {
this.password = password;
}
publicLoginRequestMessage(String username, String password) {
this.username = username;
this.password = password;
}
@Override
publicintgetMessageType() {
returnLoginRequestMessage;
}
}
上述這個(gè)消息類,主要是提供給客戶端登錄時(shí)使用,本質(zhì)上也就是一個(gè)涵蓋用戶名、用戶密碼的對象而已,同時(shí)還有一個(gè)用來給服務(wù)端響應(yīng)時(shí)的響應(yīng)類。
如下:
publicclassLoginResponseMessage extendsAbstractResponseMessage {
publicLoginResponseMessage(booleansuccess, String reason) {
super(success, reason);
}
@Override
publicintgetMessageType() {
returnLoginResponseMessage;
}
}
登錄響應(yīng)類的實(shí)現(xiàn)十分簡單,由登錄狀態(tài)和登錄消息組成,OK,接著來看看登錄的具體實(shí)現(xiàn)。
首先在客戶端中,再通過pipeline添加一個(gè)處理器,如下:
CountDownLatch WAIT_FOR_LOGIN = newCountDownLatch(1);
AtomicBoolean LOGIN = newAtomicBoolean(false);
AtomicBoolean EXIT = newAtomicBoolean(false);
Scanner scanner = newScanner(System.in);
ch.pipeline().addLast("client handler", newChannelInboundHandlerAdapter() {
@Override
publicvoidchannelActive(ChannelHandlerContext ctx) throwsException {
// 負(fù)責(zé)接收用戶在控制臺的輸入,負(fù)責(zé)向服務(wù)器發(fā)送各種消息
newThread(() -> {
System.out.println("請輸入用戶名:");
String username = scanner.nextLine();
if(EXIT.get()){
return;
}
System.out.println("請輸入密碼:");
String password = scanner.nextLine();
if(EXIT.get()){
return;
}
// 構(gòu)造消息對象
LoginRequestMessage message = newLoginRequestMessage(username, password);
System.out.println(message);
// 發(fā)送消息
ctx.writeAndFlush(message);
System.out.println("等待后續(xù)操作...");
try{
WAIT_FOR_LOGIN.await();
} catch(InterruptedException e) {
e.printStackTrace();
}
// 如果登錄失敗
if(!LOGIN.get()) {
ctx.channel().close();
return;
}
}).start();
}
在與服務(wù)端建立連接成功之后,就提示用戶需要登錄,接著接收用戶輸入的用戶名、密碼,然后構(gòu)建出一個(gè)LoginRequestMessage消息對象,接著將其發(fā)送給服務(wù)端,由于前面裝載了自定義的協(xié)議編解碼器,所以消息在出站時(shí),這個(gè)Message對象會被序列化成字節(jié)碼,接著再服務(wù)端入站時(shí),又會被反序列化成消息對象,接著來看看服務(wù)端的實(shí)現(xiàn)。
如下:
@ChannelHandler.Sharable
publicclassLoginRequestMessageHandler
extendsSimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protectedvoidchannelRead0(ChannelHandlerContext ctx,
LoginRequestMessage msg) throwsException {
String username = msg.getUsername();
String password = msg.getPassword();
booleanlogin = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage message;
if(login) {
SessionFactory.getSession().bind(ctx.channel(), username);
message = newLoginResponseMessage(true, "登錄成功");
} else{
message = newLoginResponseMessage(false, "用戶名或密碼不正確");
}
ctx.writeAndFlush(message);
}
}
在服務(wù)端中,新增了一個(gè)處理器類,繼承自SimpleChannelInboundHandler這個(gè)處理器,其中指定的泛型為LoginRequestMessage,這表示當(dāng)前處理器只關(guān)注這個(gè)類型的消息,當(dāng)出現(xiàn)登錄類型的消息時(shí),會進(jìn)入該處理器并觸發(fā)內(nèi)部的channelRead0()方法。
在該方法中,獲取了登錄消息中的用戶名、密碼,接著對其做了基本的登錄效驗(yàn),如果用戶名存在并且密碼正確,就會返回登錄成功,否則會返回登錄失敗,最終登錄后的狀態(tài)會被封裝成一個(gè)LoginResponseMessage對象,然后寫回客戶端的通道中。
當(dāng)然,為了該處理器能夠成功生效,這里需要將其裝載到服務(wù)端的pipeline上。
如下:
LoginRequestMessageHandler LOGIN_HANDLER = newLoginRequestMessageHandler();
ch.pipeline().addLast(LOGIN_HANDLER);
裝載好登錄處理器后,接著分別啟動(dòng)服務(wù)端、客戶端,測試結(jié)果如下:

從圖中的效果來看,這里實(shí)現(xiàn)了最基本的登錄功能,估計(jì)有些小伙伴看到這里就有些暈了,但其實(shí)非常簡單,僅僅只是通過Netty在做數(shù)據(jù)交互而已,客戶端則提供輸入用戶名、密碼的功能,然后將用戶輸入的名稱、密碼發(fā)送給服務(wù)端,服務(wù)端提供登錄判斷的功能,最終根據(jù)判斷結(jié)果再向客戶端返回?cái)?shù)據(jù)罷了。
9、實(shí)戰(zhàn)要點(diǎn)2:實(shí)現(xiàn)點(diǎn)對點(diǎn)單聊
9.1概述
有了基本的用戶登錄功能后,接著來看看如何實(shí)現(xiàn)點(diǎn)對點(diǎn)的單聊功能呢?
首先我定義了一個(gè)會話接口,如下:
publicinterfaceSession {
voidbind(Channel channel, String username);
voidunbind(Channel channel);
Channel getChannel(String username);
}
這個(gè)接口中依舊只有三個(gè)方法,釋義如下:
- 1)bind():傳入一個(gè)用戶名和Socket通道,讓兩者之間的產(chǎn)生綁定關(guān)系;
- 2)unbind():取消一個(gè)用戶與某個(gè)Socket通道的綁定關(guān)系;
- 3)getChannel():根據(jù)一個(gè)用戶名,獲取與其存在綁定關(guān)系的通道。
該接口的實(shí)現(xiàn)類如下:
publicclassSessionMemoryImpl implementsSession {
privatefinalMap<String, Channel> usernameChannelMap = newConcurrentHashMap<>();
privatefinalMap<Channel, String> channelUsernameMap = newConcurrentHashMap<>();
@Override
publicvoidbind(Channel channel, String username) {
usernameChannelMap.put(username, channel);
channelUsernameMap.put(channel, username);
channelAttributesMap.put(channel, newConcurrentHashMap<>());
}
@Override
publicvoidunbind(Channel channel) {
String username = channelUsernameMap.remove(channel);
usernameChannelMap.remove(username);
channelAttributesMap.remove(channel);
}
@Override
publicChannel getChannel(String username) {
returnusernameChannelMap.get(username);
}
@Override
publicString toString() {
returnusernameChannelMap.toString();
}
}
該實(shí)現(xiàn)類最關(guān)鍵的是其中的兩個(gè)Map容器,usernameChannelMap用來存儲所有用戶名與Socket通道的綁定關(guān)系,而channelUsernameMap則是反過來的順序,這主要是為了方便,即可以通過用戶名獲得對應(yīng)通道,也可以通過通道判斷出用戶名,實(shí)際上一個(gè)Map也能搞定,但還是那句話,主要為了簡單嘛~
有了上述這個(gè)最簡單的會話管理功能后,就要著手實(shí)現(xiàn)具體的功能了,其實(shí)在前面實(shí)現(xiàn)登錄功能的時(shí)候,就用過這其中的bind()方法,也就是當(dāng)?shù)卿洺晒χ螅蜁?dāng)前發(fā)送登錄消息的通道,與正在登錄的用戶名產(chǎn)生綁定關(guān)系,這樣就方便后續(xù)實(shí)現(xiàn)單聊、群聊的功能。
9.2定義單聊的消息對象
與登錄時(shí)相同,由于需要在服務(wù)端和客戶端之間實(shí)現(xiàn)數(shù)據(jù)的轉(zhuǎn)發(fā),因此這里也需要兩個(gè)消息對象,用來作為數(shù)據(jù)交互的消息格式。
如下:
publicclassChatRequestMessage extendsMessage {
privateString content;
privateString to;
privateString from;
publicChatRequestMessage() {
}
publicChatRequestMessage(String from, String to, String content) {
this.from = from;
this.to = to;
this.content = content;
}
// 省略Get/Setting、toString()方法.....
}
上述這個(gè)類,是提供給客戶端用來發(fā)送消息數(shù)據(jù)的,其中主要包含了三個(gè)值,聊天的消息內(nèi)容、發(fā)送人與接收人。因?yàn)檫@里是需要實(shí)現(xiàn)一個(gè)IM聊天程序,所以并不是客戶端與服務(wù)端進(jìn)行數(shù)據(jù)交互,而是客戶端與客戶端之間進(jìn)行數(shù)據(jù)交互,服務(wù)端僅僅只提供消息轉(zhuǎn)發(fā)的功能,接著再構(gòu)建一個(gè)消息類。
如下:
publicclassChatResponseMessage extendsAbstractResponseMessage {
privateString from;
privateString content;
@Override
publicString toString() {
return"ChatResponseMessage{"+
"from='"+ from + '\''+
", content='"+ content + '\''+
'}';
}
publicChatResponseMessage(booleansuccess, String reason) {
super(success, reason);
}
publicChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
publicintgetMessageType() {
returnChatResponseMessage;
}
// 省略Get/Setting、toString()方法.....
}
這個(gè)類是提供給服務(wù)端用來轉(zhuǎn)發(fā)的,當(dāng)服務(wù)端收到一個(gè)聊天消息后,因?yàn)榱奶煜⒅邪私邮杖耍钥梢韵雀鶕?jù)接收人的用戶名,找到對應(yīng)的客戶端通道,然后再封裝成一個(gè)響應(yīng)消息,轉(zhuǎn)發(fā)給對應(yīng)的客戶端即可,下面來做具體實(shí)現(xiàn)。
9.3實(shí)現(xiàn)點(diǎn)對點(diǎn)單聊功能
由于聊天功能是提供給客戶端使用的,所以當(dāng)一個(gè)客戶端登錄成功之后,應(yīng)該暴露給用戶一個(gè)操作菜單,所以直接在原本客戶端的channelActive()方法中,登錄成功之后繼續(xù)加代碼即可。
代碼如下:
while(true) {
System.out.println("==================================");
System.out.println("\t1、發(fā)送單聊消息");
System.out.println("\t2、發(fā)送群聊消息");
System.out.println("\t3、創(chuàng)建一個(gè)群聊");
System.out.println("\t4、獲取群聊成員");
System.out.println("\t5、加入一個(gè)群聊");
System.out.println("\t6、退出一個(gè)群聊");
System.out.println("\t7、退出聊天系統(tǒng)");
System.out.println("==================================");
String command = scanner.nextLine();
}
首先會開啟一個(gè)死循環(huán),然后不斷接收用戶的操作,接著使用switch語法來對具體的菜單功能進(jìn)行實(shí)現(xiàn),先實(shí)現(xiàn)單聊功能。
如下:
switch(command){
case"1":
System.out.print("請選擇你要發(fā)送消息給誰:");
String toUserName = scanner.nextLine();
System.out.print("請輸入你要發(fā)送的消息內(nèi)容:");
String content = scanner.nextLine();
ctx.writeAndFlush(newChatRequestMessage(username, toUserName, content));
break;
}
如果用戶選擇了單聊,接著會提示用戶選擇要發(fā)送消息給誰,這里也就是讓用戶輸入對方的用戶名,實(shí)際上如果有界面的話,這一步是并不需要用戶自己輸入的,而是提供窗口讓用戶點(diǎn)擊,比如QQ、微信一樣,想要給某個(gè)人發(fā)送消息時(shí),只需要點(diǎn)擊“他”的頭像私聊即可。
等用戶選擇了聊天目標(biāo),并且輸入了消息內(nèi)容后,接著會構(gòu)建一個(gè)ChatRequestMessage消息對象,然后會發(fā)送給服務(wù)端,但這里先不看服務(wù)端的實(shí)現(xiàn),客戶端這邊還需要重寫一個(gè)方法。
如下:
@Override
publicvoidchannelRead(ChannelHandlerContext ctx, Object msg) throwsException {
System.out.println("收到消息:"+ msg);
if((msg instanceofLoginResponseMessage)) {
LoginResponseMessage response = (LoginResponseMessage) msg;
if(response.isSuccess()) {
// 如果登錄成功
LOGIN.set(true);
}
// 喚醒 system in 線程
WAIT_FOR_LOGIN.countDown();
}
}
前面的邏輯是在channelActive()方法中完成的,也就是連接建立成功后,就會讓用戶登錄,接著登錄成功之后會給用戶一個(gè)菜單欄,提供給用戶進(jìn)行操作,但前面的邏輯中一直沒有對服務(wù)端響應(yīng)的消息進(jìn)行處理,因此channelRead()方法中會對服務(wù)端響應(yīng)的數(shù)據(jù)進(jìn)行處理。
channelRead()方法會在有數(shù)據(jù)可讀時(shí)被觸發(fā),所以當(dāng)服務(wù)端響應(yīng)數(shù)據(jù)時(shí),首先會判斷一下:目前服務(wù)端響應(yīng)的是不是登錄消息,如果是的話,則需要根據(jù)登錄的結(jié)果來喚醒前面channelActive()方法中的線程。如果目前服務(wù)端響應(yīng)的不是登錄消息,這也就意味著客戶端前面已經(jīng)登錄成功了,所以接著會直接打印一下收到的數(shù)據(jù)。
OK,有了上述客戶端的代碼實(shí)現(xiàn)后,接著再來服務(wù)端多創(chuàng)建一個(gè)處理器。
如下:
@ChannelHandler.Sharable
publicclassChatRequestMessageHandler
extendsSimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protectedvoidchannelRead0(ChannelHandlerContext ctx,
ChatRequestMessage msg) throwsException {
String to = msg.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);
// 在線
if(channel != null) {
channel.writeAndFlush(newChatResponseMessage(
msg.getFrom(), msg.getContent()));
}
// 不在線
else{
ctx.writeAndFlush(newChatResponseMessage(
false, "對方用戶不存在或者不在線"));
}
}
}
這里依舊通過繼承SimpleChannelInboundHandler類的形式,來特別關(guān)注ChatRequestMessage單聊類型的消息,如果目前服務(wù)端收到的是單聊消息,則會進(jìn)入觸發(fā)該處理器的channelRead0()方法。
該處理器內(nèi)部的邏輯也并不復(fù)雜,首先根據(jù)單聊消息的接收人,去找一下與之對應(yīng)的通道:
- 1)如果根據(jù)用戶名查到了通道,表示接收人目前是登錄在線狀態(tài);
- 2)反之,如果無法根據(jù)用戶名找到通道,表示對應(yīng)的用戶不存在或者沒有登錄。
接著會根據(jù)上面的查詢結(jié)果,進(jìn)行對應(yīng)的結(jié)果返回:
- 1)如果在線:把要發(fā)送的單聊消息,直接寫入至找到的通道中;
- 2)如果不在線:向發(fā)送單聊消息的客戶端,返回用戶不存在或用戶不在線。
有了這個(gè)處理器之后,接著還需要把該處理器裝載到服務(wù)端上,如下:
ChatRequestMessageHandler CHAT_HANDLER = newChatRequestMessageHandler();
ch.pipeline().addLast(CHAT_HANDLER);
裝載好單聊處理器后,接著分別啟動(dòng)一個(gè)服務(wù)端、兩個(gè)客戶端,測試結(jié)果如下:

從測試結(jié)果中可以明顯看出效果,其中的單聊功能的確已經(jīng)實(shí)現(xiàn),可以實(shí)現(xiàn)A→B用戶之間的單聊功能,兩者之間借助服務(wù)器轉(zhuǎn)發(fā),可以實(shí)現(xiàn)兩人私聊的功能。
10、實(shí)戰(zhàn)要點(diǎn)3:打造多人聊天室
10.1概述
前面實(shí)現(xiàn)了兩個(gè)用戶之間的私聊功能,接著再來實(shí)現(xiàn)一個(gè)多人聊天室的功能,畢竟像QQ、微信、釘釘....等任何通訊軟件,都支持多人建立群聊的功能。
但多人聊天室的功能,實(shí)現(xiàn)之前還需要先完成建群的功能,畢竟如果群都沒建立,自然無法向某個(gè)群內(nèi)發(fā)送數(shù)據(jù)。
實(shí)現(xiàn)拉群也好,群聊也罷,其實(shí)現(xiàn)步驟依舊和前面相同,如下:
- 1)先定義對應(yīng)的消息對象;
- 2)實(shí)現(xiàn)客戶端發(fā)送對應(yīng)消息數(shù)據(jù)的功能;
- 3)再寫一個(gè)服務(wù)端的群聊處理器,然后裝載到服務(wù)端上。
10.2定義拉群的消息體
首先來定義兩個(gè)拉群時(shí)用的消息體,如下:
publicclassGroupCreateRequestMessage extendsMessage {
privateString groupName;
privateSet<String> members;
publicGroupCreateRequestMessage(String groupName, Set<String> members) {
this.groupName = groupName;
this.members = members;
}
@Override
publicintgetMessageType() {
returnGroupCreateRequestMessage;
}
// 省略其他Get/Settings、toString()方法.....
}
上述這個(gè)消息體是提供給客戶端使用的,其中主要存在兩個(gè)成員,也就是群名稱與群成員列表,存放所有群成員的容器選用了Set集合,因?yàn)镾et集合具備不可重復(fù)性,因此可以有效的避免同一用戶多次進(jìn)群,接著再來看看服務(wù)端響應(yīng)時(shí)用的消息體。
如下:
publicclassGroupCreateResponseMessage extendsAbstractResponseMessage {
publicGroupCreateResponseMessage(booleansuccess, String reason) {
super(success, reason);
}
@Override
publicintgetMessageType() {
returnGroupCreateResponseMessage;
}
}
這個(gè)消息體的實(shí)現(xiàn)尤為簡單,僅僅只是給客戶端返回了拉群狀態(tài)以及拉群的附加信息。
10.3定義群聊會話管理
前面單聊有單聊的會話管理機(jī)制,而實(shí)現(xiàn)多人群聊時(shí),依舊需要有群聊的會話管理機(jī)制,首先封裝了一個(gè)群聊實(shí)體類。
如下:
publicclassGroup {
// 聊天室名稱
privateString name;
// 聊天室成員
privateSet<String> members;
publicstaticfinalGroup EMPTY_GROUP = newGroup("empty", Collections.emptySet());
publicGroup(String name, Set<String> members) {
this.name = name;
this.members = members;
}
// 省略其他Get/Settings、toString()方法.....
}
接著定義了一個(gè)群聊會話的頂級接口,如下:
publicinterfaceGroupSession {
// 創(chuàng)建一個(gè)群聊
Group createGroup(String name, Set<String> members);
// 加入某個(gè)群聊
Group joinMember(String name, String member);
// 移除群聊中的某個(gè)成員
Group removeMember(String name, String member);
// 解散一個(gè)群聊
Group removeGroup(String name);
// 獲取一個(gè)群聊的成員列表
Set<String> getMembers(String name);
// 獲取一個(gè)群聊所有在線用戶的Channel通道
List<Channel> getMembersChannel(String name);
}
上述接口中,提供了幾個(gè)接口方法,其實(shí)也主要是群聊系統(tǒng)中的一些日常操作,如創(chuàng)群、加群、踢人、解散群、查看群成員....等功能,接著來看看該接口的實(shí)現(xiàn)者。
如下:
publicclassGroupSessionMemoryImpl implementsGroupSession {
privatefinalMap<String, Group> groupMap = newConcurrentHashMap<>();
@Override
publicGroup createGroup(String name, Set<String> members) {
Group group = newGroup(name, members);
returngroupMap.putIfAbsent(name, group);
}
@Override
publicGroup joinMember(String name, String member) {
returngroupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().add(member);
returnvalue;
});
}
@Override
publicGroup removeMember(String name, String member) {
returngroupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().remove(member);
returnvalue;
});
}
@Override
publicGroup removeGroup(String name) {
returngroupMap.remove(name);
}
@Override
publicSet<String> getMembers(String name) {
returngroupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
@Override
publicList<Channel> getMembersChannel(String name) {
returngetMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
這個(gè)實(shí)現(xiàn)類沒啥好說的,重點(diǎn)記住里面有個(gè)Map容器即可,這個(gè)容器主要負(fù)責(zé)存儲所有群名稱與Group群聊對象的關(guān)系,后續(xù)可以通過群聊名稱,在這個(gè)容器中找到一個(gè)對應(yīng)群聊對象。同時(shí)為了方便后續(xù)調(diào)用這些接口,還提供了一個(gè)工具類。
如下:
publicabstractclassGroupSessionFactory {
privatestaticGroupSession session = newGroupSessionMemoryImpl();
publicstaticGroupSession getGroupSession() {
returnsession;
}
}
很簡單,僅僅只實(shí)例化了一個(gè)群聊會話管理的實(shí)現(xiàn)類,因?yàn)檫@里沒有結(jié)合Spring來實(shí)現(xiàn),所以并不能依靠IOC技術(shù)來自動(dòng)管理Bean,因此咱們需要手動(dòng)創(chuàng)建出一個(gè)實(shí)例,以供于后續(xù)使用。
10.4實(shí)現(xiàn)拉群功能
前面客戶端的功能菜單中,3對應(yīng)著拉群功能,所以咱們需要對3做具體的功能實(shí)現(xiàn)。
邏輯如下:
case"3":
System.out.print("請輸入你要?jiǎng)?chuàng)建的群聊昵稱:");
String newGroupName = scanner.nextLine();
System.out.print("請選擇你要邀請的群成員(不同成員用、分割):");
String members = scanner.nextLine();
Set<String> memberSet = newHashSet<>(Arrays.asList(members.split("、")));
memberSet.add(username); // 加入自己
ctx.writeAndFlush(newGroupCreateRequestMessage(newGroupName, memberSet));
break;
在該分支實(shí)現(xiàn)中,首先會要求用戶輸入一個(gè)群聊昵稱,接著需要輸入需要拉入群聊的用戶名稱,多個(gè)用戶之間使用、分割,接著會把用戶輸入的群成員以及自己,全部放入到一個(gè)Set集合中,最終組裝成一個(gè)拉群消息體,發(fā)送給服務(wù)端處理。
服務(wù)端的處理器如下:
@ChannelHandler.Sharable
publicclassGroupCreateRequestMessageHandler
extendsSimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protectedvoidchannelRead0(ChannelHandlerContext ctx,
GroupCreateRequestMessage msg) throwsException {
String groupName = msg.getGroupName();
Set<String> members = msg.getMembers();
// 群管理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);
if(group == null) {
// 發(fā)生成功消息
ctx.writeAndFlush(newGroupCreateResponseMessage(true,
groupName + "創(chuàng)建成功"));
// 發(fā)送拉群消息
List<Channel> channels = groupSession.getMembersChannel(groupName);
for(Channel channel : channels) {
channel.writeAndFlush(newGroupCreateResponseMessage(
true, "您已被拉入"+ groupName));
}
} else{
ctx.writeAndFlush(newGroupCreateResponseMessage(
false, groupName + "已經(jīng)存在"));
}
}
}
這里依舊繼承了SimpleChannelInboundHandler類,只關(guān)心拉群的消息,當(dāng)客戶端出現(xiàn)拉群消息時(shí),首先會獲取用戶輸入的群昵稱和群成員,接著通過前面提供的創(chuàng)群接口,嘗試創(chuàng)建一個(gè)群聊,如果群聊已經(jīng)存在,則會創(chuàng)建失敗,反之則會創(chuàng)建成功,在創(chuàng)建群聊成功的情況下,會給所有的群成員發(fā)送一條“你已被拉入[XXX]”的消息。
最后,同樣需要將該處理器裝載到服務(wù)端上,如下:
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER =
newGroupCreateRequestMessageHandler();
ch.pipeline().addLast(GROUP_CREATE_HANDLER);
最后分別啟動(dòng)一個(gè)服務(wù)端、兩個(gè)客戶端進(jìn)行效果測試,如下:

從上圖的測試結(jié)果來看,的確實(shí)現(xiàn)了咱們的拉群效果,一個(gè)用戶拉群之后,被邀請的成員都會收到來自于服務(wù)端的拉群提醒,這也就為后續(xù)群聊功能奠定了基礎(chǔ)。
10.5定義群聊的消息體
這里就不重復(fù)贅述了,還是之前的套路,定義一個(gè)客戶端用的消息體,如下:
publicclassGroupChatRequestMessage extendsMessage {
privateString content;
privateString groupName;
privateString from;
publicGroupChatRequestMessage(String from, String groupName, String content) {
this.content = content;
this.groupName = groupName;
this.from = from;
}
@Override
publicintgetMessageType() {
returnGroupChatRequestMessage;
}
// 省略其他Get/Settings、toString()方法.....
}
這個(gè)是客戶端用來發(fā)送群聊消息的消息體,其中存在三個(gè)成員,發(fā)送人、群聊昵稱、消息內(nèi)容,通過這三個(gè)成員,可以描述清楚任何一條群聊記錄,接著來看看服務(wù)端響應(yīng)時(shí)用的消息體。
如下:
publicclassGroupChatResponseMessage extendsAbstractResponseMessage {
privateString from;
privateString content;
publicGroupChatResponseMessage(booleansuccess, String reason) {
super(success, reason);
}
publicGroupChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
publicintgetMessageType() {
returnGroupChatResponseMessage;
}
// 省略其他Get/Settings、toString()方法.....
}
在這個(gè)消息體中,就省去了群聊昵稱這個(gè)成員,因?yàn)檫@個(gè)消息體的用處,主要是給服務(wù)端轉(zhuǎn)發(fā)給客戶端時(shí)使用的,因此不需要群聊昵稱,當(dāng)然,要也可以,我這里就直接省去了。
10.6實(shí)現(xiàn)群聊功能
依舊先來做客戶端的實(shí)現(xiàn),實(shí)現(xiàn)了客戶端之后再去完成服務(wù)端的實(shí)現(xiàn),客戶端實(shí)現(xiàn)如下:
case"2":
System.out.print("請選擇你要發(fā)送消息的群聊:");
String groupName = scanner.nextLine();
System.out.print("請輸入你要發(fā)送的消息內(nèi)容:");
String groupContent = scanner.nextLine();
ctx.writeAndFlush(newGroupChatRequestMessage(username, groupName, groupContent));
break;
因?yàn)榘l(fā)送群聊消息對應(yīng)著之前菜單中的2,所以這里對該分支進(jìn)行實(shí)現(xiàn),當(dāng)用戶選擇發(fā)送群聊消息時(shí),首先會讓用戶自己先選擇一個(gè)群聊,接著輸入要發(fā)送的消息內(nèi)容,接著組裝成一個(gè)群聊消息對象,發(fā)送給服務(wù)端處理。
服務(wù)端的實(shí)現(xiàn)如下:
@ChannelHandler.Sharable
publicclassGroupChatRequestMessageHandler
extendsSimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protectedvoidchannelRead0(ChannelHandlerContext ctx,
GroupChatRequestMessage msg) throwsException {
List<Channel> channels = GroupSessionFactory.getGroupSession()
.getMembersChannel(msg.getGroupName());
for(Channel channel : channels) {
channel.writeAndFlush(newGroupChatResponseMessage(
msg.getFrom(), msg.getContent()));
}
}
}
這里依舊定義了一個(gè)處理器,關(guān)于原因就不再重復(fù)啰嗦了,服務(wù)端對于群聊消息的實(shí)現(xiàn)額外簡單,也就是先根據(jù)用戶選擇的群昵稱,找到該群所有的群成員,然后依次遍歷成員列表,獲取對應(yīng)的Socket通道,轉(zhuǎn)發(fā)消息即可。
接著將該處理器裝載到服務(wù)端pipeline上,然后分別啟動(dòng)一個(gè)服務(wù)端、兩個(gè)客戶端,進(jìn)行效果測試,如下:

效果如上圖的注釋,基于上述的代碼測試,效果確實(shí)達(dá)到了咱們需要的群聊效果~
10.7聊天室的其他功能實(shí)現(xiàn)
到這里為止,實(shí)現(xiàn)了最基本的建群、群聊的功能,但對于踢人、加群、解散群....等一系列群聊功能還未曾實(shí)現(xiàn),但我這里就不繼續(xù)重復(fù)了。
畢竟還是那個(gè)套路:
- 1)定義對應(yīng)功能的消息體;
- 2)客戶端向服務(wù)端發(fā)送對應(yīng)格式的消息;
- 3)服務(wù)端編寫處理器,對特定的消息進(jìn)行處理。
所以大家感興趣的情況下,可以根據(jù)上述步驟繼續(xù)進(jìn)行實(shí)現(xiàn),實(shí)現(xiàn)的過程沒有任何難度,重點(diǎn)就是時(shí)間問題罷了。
11、本文小結(jié)
看到這里,其實(shí)Netty實(shí)戰(zhàn)篇的內(nèi)容也就大致結(jié)束了,個(gè)人對于實(shí)戰(zhàn)篇的內(nèi)容并不怎么滿意,因?yàn)榕c最初設(shè)想的實(shí)現(xiàn)存在很大偏差,這是由于近期工作、生活狀態(tài)不對,所以內(nèi)容輸出也沒那么夯實(shí),對于這篇中的完整代碼實(shí)現(xiàn),也包括前面兩篇中的一些代碼實(shí)現(xiàn)(詳見“2、配套源碼”),大家感興趣可以自行Down下去玩玩。
在我所撰寫的案例中,自定義協(xié)議可以繼續(xù)優(yōu)化,選擇性能更強(qiáng)的序列化方式,而聊天室也可以進(jìn)一步拓展,比如將用戶信息、群聊信息、聯(lián)系人信息都結(jié)合數(shù)據(jù)庫實(shí)現(xiàn),進(jìn)一步實(shí)現(xiàn)離線消息功能,但由于該案例的設(shè)計(jì)之初就有問題,所以是存在性能問題的,想要打造一款真正高性能的IM程序,那諸位可參考本系列前面的文章即可。
12、系列文章
《跟著源碼學(xué)IM(一):手把手教你用Netty實(shí)現(xiàn)心跳機(jī)制、斷線重連機(jī)制》
《跟著源碼學(xué)IM(二):自已開發(fā)IM很難?手把手教你擼一個(gè)Andriod版IM》
《跟著源碼學(xué)IM(三):基于Netty,從零開發(fā)一個(gè)IM服務(wù)端》
《跟著源碼學(xué)IM(四):拿起鍵盤就是干,教你徒手開發(fā)一套分布式IM系統(tǒng)》
《跟著源碼學(xué)IM(五):正確理解IM長連接、心跳及重連機(jī)制,并動(dòng)手實(shí)現(xiàn)》
《跟著源碼學(xué)IM(六):手把手教你用Go快速搭建高性能、可擴(kuò)展的IM系統(tǒng)》
《跟著源碼學(xué)IM(七):手把手教你用WebSocket打造Web端IM聊天》
《跟著源碼學(xué)IM(八):萬字長文,手把手教你用Netty打造IM聊天》
《跟著源碼學(xué)IM(九):基于Netty實(shí)現(xiàn)一套分布式IM系統(tǒng)》
《跟著源碼學(xué)IM(十):基于Netty,搭建高性能IM集群(含技術(shù)思路+源碼)》
《跟著源碼學(xué)IM(十一):一套基于Netty的分布式高可用IM詳細(xì)設(shè)計(jì)與實(shí)現(xiàn)(有源碼)》
《跟著源碼學(xué)IM(十二):基于Netty打造一款高性能的IM即時(shí)通訊程序》(* 本文)
《SpringBoot集成開源IM框架MobileIMSDK,實(shí)現(xiàn)即時(shí)通訊IM聊天功能》
13、參考資料
[1] 淺談IM系統(tǒng)的架構(gòu)設(shè)計(jì)
[2] 簡述移動(dòng)端IM開發(fā)的那些坑:架構(gòu)設(shè)計(jì)、通信協(xié)議和客戶端
[3] 一套海量在線用戶的移動(dòng)端IM架構(gòu)設(shè)計(jì)實(shí)踐分享(含詳細(xì)圖文)
[4] 一套原創(chuàng)分布式即時(shí)通訊(IM)系統(tǒng)理論架構(gòu)方案
[5] 一套億級用戶的IM架構(gòu)技術(shù)干貨(上篇):整體架構(gòu)、服務(wù)拆分等
[6] 一套億級用戶的IM架構(gòu)技術(shù)干貨(下篇):可靠性、有序性、弱網(wǎng)優(yōu)化等
[7] 史上最通俗Netty框架入門長文:基本介紹、環(huán)境搭建、動(dòng)手實(shí)戰(zhàn)
[8] 強(qiáng)列建議將Protobuf作為你的即時(shí)通訊應(yīng)用數(shù)據(jù)傳輸格式
[9] IM通訊協(xié)議專題學(xué)習(xí)(一):Protobuf從入門到精通,一篇就夠!
[10] 融云技術(shù)分享:全面揭秘億級IM消息的可靠投遞機(jī)制
[12] 零基礎(chǔ)IM開發(fā)入門(四):什么是IM系統(tǒng)的消息時(shí)序一致性?
[13] 如何保證IM實(shí)時(shí)消息的“時(shí)序性”與“一致性”?
[14] 微信的海量IM聊天消息序列號生成實(shí)踐(算法原理篇)
[15] 網(wǎng)易云信技術(shù)分享:IM中的萬人群聊技術(shù)方案實(shí)踐總結(jié)
[16] 融云IM技術(shù)分享:萬人群聊消息投遞方案的思考和實(shí)踐
[17] 為何基于TCP協(xié)議的移動(dòng)端IM仍然需要心跳保活機(jī)制?
[18] 一文讀懂即時(shí)通訊應(yīng)用中的網(wǎng)絡(luò)心跳包機(jī)制:作用、原理、實(shí)現(xiàn)思路等
[19] 微信團(tuán)隊(duì)原創(chuàng)分享:Android版微信后臺保活實(shí)戰(zhàn)分享(網(wǎng)絡(luò)保活篇)
[20] 融云技術(shù)分享:融云安卓端IM產(chǎn)品的網(wǎng)絡(luò)鏈路保活技術(shù)實(shí)踐
[21] 徹底搞懂TCP協(xié)議層的KeepAlive保活機(jī)制
[22] 深度解密釘釘即時(shí)消息服務(wù)DTIM的技術(shù)設(shè)計(jì)
(本文已同步發(fā)布于:http://www.52im.net/thread-4530-1-1.html)
作者:Jack Jiang (點(diǎn)擊作者姓名進(jìn)入Github)
出處:http://www.52im.net/space-uid-1.html
交流:歡迎加入即時(shí)通訊開發(fā)交流群 215891622
討論:http://www.52im.net/
Jack Jiang同時(shí)是【原創(chuàng)Java
Swing外觀工程BeautyEye】和【輕量級移動(dòng)端即時(shí)通訊框架MobileIMSDK】的作者,可前往下載交流。
本博文
歡迎轉(zhuǎn)載,轉(zhuǎn)載請注明出處(也可前往 我的52im.net 找到我)。