Jack Jiang

          我的最新工程MobileIMSDK:http://git.oschina.net/jackjiang/MobileIMSDK
          posts - 503, comments - 13, trackbacks - 0, articles - 1

          本文作者芋艿,原題“芋道 Spring Boot WebSocket 入門”,本次有修訂和改動。

          一、引言

          WebSocket如今在Web端即時通訊技術應用里使用廣泛,不僅用于傳統(tǒng)PC端的網(wǎng)頁里,也被很多移動端開發(fā)者用于基于HTML5的混合APP里。對于想要在基于Web的應用里添加IM、推送等實時通信功能,WebSocket幾乎是必須要掌握的技術。

          本文將基于Tomcat和Spring框架實現(xiàn)一個邏輯簡單的入門級IM應用,對于即時通訊初學者來說,能找到一個簡單直接且能順利跑通的實例代碼,顯然意義更大,本文正是如此。希望能給你的IM開發(fā)和學習帶來啟發(fā)。

          注:源碼在本文第四、五節(jié)開頭的附件處可下載。

          學習交流:

          - 即時通訊/推送技術開發(fā)交流5群:215477170 [推薦]

          - 移動端IM開發(fā)入門文章:《新手入門一篇就夠:從零開發(fā)移動端IM

          - 開源IM框架源碼:https://github.com/JackJiang2011/MobileIMSDK

          (本文同步發(fā)布于:http://www.52im.net/thread-3483-1-1.html

          二、知識準備

          如果你對Web端即時通訊知識一頭霧水,務必先讀:《新手入門貼:史上最全Web端即時通訊技術原理詳解》、《Web端即時通訊技術盤點:短輪詢、Comet、Websocket、SSE》。

          限于篇幅,本文不會深究WebSocket技術理論,如有興趣請從基礎學習:

          如果想要更硬核一點的,可以讀讀下面這幾篇:

          三、內容概述

          相比 HTTP 協(xié)議來說,WebSocket 協(xié)議對大多數(shù)后端開發(fā)者是比較陌生的。

          相對而言:WebSocket 協(xié)議重點是提供了服務端主動向客戶端發(fā)送數(shù)據(jù)的能力,這樣我們就可以完成實時性較高的需求。例如:聊天 IM 即使通訊功能、消息訂閱服務、網(wǎng)頁游戲等等。

          同時:因為 WebSocket 使用 TCP 通信,可以避免重復創(chuàng)建連接,提升通信質量和效率。例如:美團的長連接服務,具體可以看看 《美團點評的移動端網(wǎng)絡優(yōu)化實踐:大幅提升連接成功率、速度等》 。

          友情提示:

          這里有個誤區(qū),WebSocket 相比普通的 Socket 來說,僅僅是借助 HTTP 協(xié)議完成握手,創(chuàng)建連接。后續(xù)的所有通信,都和 HTTP 協(xié)議無關。

          看到這里,大家一定以為又要開始嗶嗶 WebSocket 的概念。哈哈,我偏不~如果對這塊不了的朋友,可以閱讀本文“2、知識準備”這一章。

          要想使用WebSocket,一般有如下幾種解決方案可選:

          目前筆者手頭有個涉及到 IM 即使通訊的項目,采用的是方案三。

          主要原因是:我們對 Netty 框架的實戰(zhàn)、原理與源碼,都相對熟悉一些,所以就考慮了它。并且,除了需要支持 WebSocket 協(xié)議,我們還想提供原生的 Socket 協(xié)議。

          如果僅僅是僅僅提供 WebSocket 協(xié)議的支持,可以考慮采用方案一或者方案二,在使用上,兩個方案是比較接近的。相比來說,方案一 Spring WebSocket 內置了對 STOMP 協(xié)議的支持。

          不過:本文還是采用方案二“Tomcat WebSocket”來作為入門示例。咳咳咳,沒有特殊的原因,主要是開始寫本文之前,已經(jīng)花了 2 小時使用它寫了一個示例。實在是有點懶,不想改。如果能重來,我要選李白,哈哈哈哈~

          當然,不要慌,方案一和方案二的實現(xiàn)代碼,真心沒啥差別。

          在開始搭建 Tomcat WebSocket 入門示例之前,我們先來了解下 JSR-356 規(guī)范,定義了 Java 針對 WebSocket 的 API :即 Javax WebSocket 。規(guī)范是大哥,打死不會提供實現(xiàn),所以 JSR-356 也是如此。目前,主流的 Web 容器都已經(jīng)提供了 JSR-356 的實現(xiàn),例如說 Tomcat、Jetty、Undertow 等等。

          四、Tomcat WebSocket 實戰(zhàn)入門

          4.1、基本介紹

          示例代碼下載:

          (因附件無法上傳到此處,請從同步鏈接處下載:http://www.52im.net/thread-3483-1-1.html

          代碼目錄內容是這樣: 

          在本小節(jié)中,我們會使用 Tomcat WebSocket 搭建一個 WebSocket 的示例。

          提供如下消息的功能支持:

          • 1)身份認證請求;
          • 2)私聊消息;
          • 3)群聊消息。

          考慮到讓示例更加易懂,我們先做成全局有且僅有一個大的聊天室,即建立上 WebSocket 的連接,都自動動進入該聊天室。

          下面,開始遨游 WebSocket 這個魚塘...

          4.2、引入依賴

          在 pom.xml 文件中,引入相關依賴。

          <?xml version="1.0"encoding="UTF-8"?>

          <project xmlns="http://maven.apache.org/POM/4.0.0"

                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 [url=http://maven.apache.org/xsd/maven-4.0.0.xsd]http://maven.apache.org/xsd/maven-4.0.0.xsd[/url]">

              <parent>

                  <groupId>org.springframework.boot</groupId>

                  <artifactId>spring-boot-starter-parent</artifactId>

                  <version>2.1.10.RELEASE</version>

                  <relativePath/> <!-- lookup parent from repository -->

              </parent>

              <modelVersion>4.0.0</modelVersion>

              <artifactId>lab-25-01</artifactId>

              <dependencies>

                  <!-- 實現(xiàn)對 WebSocket 相關依賴的引入,方便~ -->

                  <dependency>

                      <groupId>org.springframework.boot</groupId>

                      <artifactId>spring-boot-starter-websocket</artifactId>

                  </dependency>

                  <!-- 引入 Fastjson ,實現(xiàn)對 JSON 的序列化,因為后續(xù)我們會使用它解析消息 -->

                  <dependency>

                      <groupId>com.alibaba</groupId>

                      <artifactId>fastjson</artifactId>

                      <version>1.2.62</version>

                  </dependency>

              </dependencies>

          </project>

          具體每個依賴的作用,自己認真看下注釋。

          4.3、WebsocketServerEndpoint

          在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包路徑下,創(chuàng)建 WebsocketServerEndpoint 類,定義 Websocket 服務的端點(EndPoint)。

          代碼如下:

          // WebsocketServerEndpoint.java

          @Controller

          @ServerEndpoint("/")

          public class WebsocketServerEndpoint {

              private Logger logger = LoggerFactory.getLogger(getClass());

              @OnOpen

              public void onOpen(Session session, EndpointConfig config) {

                  logger.info("[onOpen][session({}) 接入]", session);

              }

              @OnMessage

              public void onMessage(Session session, String message) {

                  logger.info("[onOpen][session({}) 接收到一條消息({})]", session, message); // 生產環(huán)境下,請設置成 debug 級別

              }

              @OnClose

              public void onClose(Session session, CloseReason closeReason) {

                  logger.info("[onClose][session({}) 連接關閉。關閉原因是({})}]", session, closeReason);

              }

              @OnError

              public void onError(Session session, Throwable throwable) {

                  logger.info("[onClose][session({}) 發(fā)生異常]", session, throwable);

              }

          }

          如代碼所示:

          • 1)在類上,添加 @Controller 注解,保證創(chuàng)建一個 WebsocketServerEndpoint Bean;
          • 2)在類上,添加 JSR-356 定義的 @ServerEndpoint 注解,標記這是一個 WebSocket EndPoint ,路徑為 / ;
          • 3)WebSocket 一共有四個事件,分別對應使用 JSR-356 定義的 @OnOpen@OnMessage@OnClose@OnError 注解。

          這是最簡版的 WebsocketServerEndpoint 的代碼。在下文,我們會慢慢把代碼補全。

          4.4、WebSocketConfiguration

          在 cn.iocoder.springboot.lab24.springwebsocket.config 包路徑下,創(chuàng)建 WebsocketServerEndpoint 配置類。

          代碼如下:

          // WebSocketConfiguration.java

          @Configuration

          // @EnableWebSocket // 無需添加該注解,因為我們并不是使用 Spring WebSocket

          public class WebSocketConfiguration {

              @Bean

              public ServerEndpointExporter serverEndpointExporter() {

                  return new ServerEndpointExporter();

              }

          }

          PS:在 #serverEndpointExporter() 方法中,創(chuàng)建 ServerEndpointExporter Bean 。該 Bean 的作用,是掃描添加有 @ServerEndpoint 注解的 Bean 。

          4.5、Application

          創(chuàng)建 Application.java 類,配置 @SpringBootApplication 注解即可。

          代碼如下:

          // Application.java

          @SpringBootApplication

          public class Application {

              public static void main(String[] args) {

                  SpringApplication.run(Application.class, args);

              }

          }

          執(zhí)行 Application 啟動該示例項目。

          考慮到大家可能不會或者不愿意寫前端代碼,所以我們直接使用 WebSocket在線測試工具,測試 WebSocket 連接。

          如下圖:

          至此,最簡單的一個 WebSocket 項目的骨架,我們已經(jīng)搭建完成。下面,我們開始改造,把相應的邏輯補全。

          4.6、消息

          在 HTTP 協(xié)議中,是基于 Request/Response 請求響應的同步模型,進行交互。在 Websocket 協(xié)議中,是基于 Message 消息的異步模型,進行交互。這一點,是很大的不同的,等會看到具體的消息類,感受會更明顯。

          因為 WebSocket 協(xié)議,不像 HTTP 協(xié)議有 URI 可以區(qū)分不同的 API 請求操作,所以我們需要在 WebSocket 的 Message 里,增加能夠標識消息類型,這里我們采用 type 字段。

          所以在這個示例中,我們采用的 Message 采用 JSON 格式編碼。

          格式如下:

          {

              type: "", // 消息類型

              body: {} // 消息體

          }

          解釋一下:

          • 1)type 字段,消息類型。通過該字段,我們知道使用哪個 MessageHandler 消息處理器(關于 MessageHandler ,我們在下一節(jié)中,詳細解析);
          • 2)body 字段,消息體。不同的消息類型,會有不同的消息體;
          • 3)Message 采用 JSON 格式編碼,主要考慮便捷性,實際項目下,也可以考慮 Protobuf 等更加高效且節(jié)省流量的編碼格式。

          實際上:我們在該示例中,body 字段對應的 Message 相關的接口和類,實在想不到名字了。所有的 Message 們,我們都放在 cn.iocoder.springboot.lab25.springwebsocket.message 包路徑下。

          4.6.1 Message

          創(chuàng)建 Message 接口,基礎消息體,所有消息體都要實現(xiàn)該接口。

          代碼如下:

          // Message.java

          publicinterfaceMessage {

          }

          目前作為一個標記接口,未定義任何操作。

          4.6.2 認證相關 Message

          創(chuàng)建 AuthRequest 類,用戶認證請求。

          代碼如下:

          // AuthRequest.java

          public class AuthRequest implements Message {

              public static final String TYPE = "AUTH_REQUEST";

              /**

               * 認證 Token

               */

              private String accessToken;

              // ... 省略 set/get 方法

          }

          解釋一下:

          • 1)TYPE 靜態(tài)屬性,消息類型為 AUTH_REQUEST 。
          • 2)accessToken 屬性,認證 Token 。

          對于第2)點,在 WebSocket 協(xié)議中,我們也需要認證當前連接,用戶身份是什么。一般情況下,我們采用用戶調用 HTTP 登錄接口,登錄成功后返回的訪問令牌 accessToken 。這里,我們先不拓展開講,事后可以看看 《基于 Token 認證的 WebSocket 連接》 文章。

          雖然說,WebSocket 協(xié)議是基于 Message 模型,進行交互。但是,這并不意味著它的操作,不需要響應結果。例如說,用戶認證請求,是需要用戶認證響應的。所以,我們創(chuàng)建 AuthResponse 類,作為用戶認證響應。

          代碼如下:

          // AuthResponse.java

          public class AuthResponse implements Message {

              public static final String TYPE = "AUTH_RESPONSE";

              /**

               * 響應狀態(tài)碼

               */

              private Integer code;

              /**

               * 響應提示

               */

              private String message;

              // ... 省略 set/get 方法

          }

          解釋一下:

          • 1)TYPE 靜態(tài)屬性,消息類型為 AUTH_REQUEST ;
          • 2)code 屬性,響應狀態(tài)碼;
          • 3)message 屬性,響應提示。

          對于第1)點,實際上,我們在每個 Message 實現(xiàn)類上,都增加了 TYPE 靜態(tài)屬性,作為消息類型。下面,我們就不重復贅述了。

          在本示例中,用戶成功認證之后,會廣播用戶加入群聊的通知 Message ,使用 UserJoinNoticeRequest 。

          代碼如下:

          // UserJoinNoticeRequest.java

          public class UserJoinNoticeRequest implements Message {

              public static final String TYPE = "USER_JOIN_NOTICE_REQUEST";

              /**

               * 昵稱

               */

              private String nickname;

              // ... 省略 set/get 方法

          }

          實際上,我們可以在需要使用到 Request/Response 模型的地方,將 Message 進行拓展:

          • 1)Request 抽象類,增加 requestId 字段,表示請求編號;
          • 2)Response 抽象類,增加 requestId 字段,和每一個 Request 請求映射上(同時,里面統(tǒng)一定義 code 和 message 屬性,表示響應狀態(tài)碼和響應提示)。

          這樣,在使用到同步模型的業(yè)務場景下,Message 實現(xiàn)類使用 Request/Reponse 作為后綴。例如說,用戶認證請求、刪除一個好友請求等等。

          而在使用到異步模型能的業(yè)務場景下,Message 實現(xiàn)類還是繼續(xù) Message 作為后綴。例如說,發(fā)送一條消息,用戶操作完后,無需阻塞等待結果

          4.6.3 發(fā)送消息相關 Message

          創(chuàng)建 SendToOneRequest 類,發(fā)送給指定人的私聊消息的 Message。

          代碼如下:

          // SendToOneRequest.java

          public class SendToOneRequest implements Message {

              public static final String TYPE = "SEND_TO_ONE_REQUEST";

              /**

               * 發(fā)送給的用戶

               */

              private String toUser;

              /**

               * 消息編號

               */

              private String msgId;

              /**

               * 內容

               */

              private String content;

              // ... 省略 set/get 方法

          }

          每個字段,自己看注釋噢。

          創(chuàng)建 SendToAllRequest 類,發(fā)送給所有人的群聊消息的 Message。

          代碼如下:

          // SendToAllRequest.java

          public class SendToAllRequest implements Message {

              public static final String TYPE = "SEND_TO_ALL_REQUEST";

              /**

               * 消息編號

               */

              private String msgId;

              /**

               * 內容

               */

              private String content;

              // ... 省略 set/get 方法

          }

          每個字段,自己看注釋噢。

          在服務端接收到發(fā)送消息的請求,需要異步響應發(fā)送是否成功。所以,創(chuàng)建 SendResponse 類,發(fā)送消息響應結果的 Message 。

          代碼如下:

          // SendResponse.java

          public class SendResponse implements Message {

              public static final String TYPE = "SEND_RESPONSE";

              /**

               * 消息編號

               */

              private String msgId;

              /**

               * 響應狀態(tài)碼

               */

              private Integer code;

              /**

               * 響應提示

               */

              private String message;

              // ... 省略 set/get 方法

          }

          重點看 msgId 字段:即消息編號。客戶端在發(fā)送消息,通過使用 UUID 算法,生成全局唯一消息編號(唯一ID的生成技術見:《從新手到專家:如何設計一套億級消息量的分布式IM系統(tǒng)》的“5、唯一ID的技術方案”章節(jié))。這樣,服務端通過 SendResponse 消息響應,通過 msgId 做映射。

          在服務端接收到發(fā)送消息的請求,需要轉發(fā)消息給對應的人。所以,創(chuàng)建 SendToUserRequest 類,發(fā)送消息給一個用戶的 Message 。

          代碼如下:

          // SendResponse.java

          public class SendToUserRequest implements Message {

              public static final String TYPE = "SEND_TO_USER_REQUEST";

              /**

               * 消息編號

               */

              private String msgId;

              /**

               * 內容

               */

              private String content;

              // ... 省略 set/get 方法

          }

          相比 SendToOneRequest 來說,少一個 toUser 字段。因為,我們可以通過 WebSocket 連接,已經(jīng)知道發(fā)送給誰了。

          4.7、消息處理器

          每個客戶端發(fā)起的 Message 消息類型,我們會聲明對應的 MessageHandler 消息處理器。這個就類似在 SpringMVC 中,每個 API 接口對應一個 Controller 的 Method 方法。

          所有的 MessageHandler 們,我們都放在 cn.iocoder.springboot.lab25.springwebsocket.handler 包路徑下。

          4.7.1 MessageHandler

          創(chuàng)建 MessageHandler 接口,消息處理器接口。

          代碼如下:

          // MessageHandler.java

          public interface MessageHandler<T extends Message> {

              /**

               * 執(zhí)行處理消息

               *

               * @param session 會話

               * @param message 消息

               */

              void execute(Session session, T message);

              /**

               * @return 消息類型,即每個 Message 實現(xiàn)類上的 TYPE 靜態(tài)字段

               */

              String getType();

          }

          解釋一下:

          • 1)定義了泛型 <T> ,需要是 Message 的實現(xiàn)類;
          • 2)定義的兩個接口方法,自己看下注釋哈。

          4.7.2 AuthMessageHandler

          創(chuàng)建 AuthMessageHandler 類,處理 AuthRequest 消息。

          代碼如下:

          // AuthMessageHandler.java

          @Component

          public class AuthMessageHandler implements MessageHandler<AuthRequest> {

              @Override

              public void execute(Session session, AuthRequest message) {

                  // 如果未傳遞 accessToken

                  if(StringUtils.isEmpty(message.getAccessToken())) {

                      WebSocketUtil.send(session, AuthResponse.TYPE,

                              new AuthResponse().setCode(1).setMessage("認證 accessToken 未傳入"));

                      return;

                  }

                  // 添加到 WebSocketUtil 中

                  WebSocketUtil.addSession(session, message.getAccessToken()); // 考慮到代碼簡化,我們先直接使用 accessToken 作為 User

                  // 判斷是否認證成功。這里,假裝直接成功

                  WebSocketUtil.send(session, AuthResponse.TYPE,newAuthResponse().setCode(0));

                  // 通知所有人,某個人加入了。這個是可選邏輯,僅僅是為了演示

                  WebSocketUtil.broadcast(UserJoinNoticeRequest.TYPE,

                          newUserJoinNoticeRequest().setNickname(message.getAccessToken())); // 考慮到代碼簡化,我們先直接使用 accessToken 作為 User

              }

              @Override

              public String getType() {

                  return AuthRequest.TYPE;

              }

          }

          代碼比較簡單,跟著代碼讀讀即可。

          關于 WebSocketUtil 類,我們在「5.8、WebSocketUtil」一節(jié)中再來詳細看看。

          4.7.3 SendToOneRequest

          創(chuàng)建 SendToOneHandler 類,處理 SendToOneRequest 消息。

          代碼如下:

          // SendToOneRequest.java

          @Component

          public class SendToOneHandler implements MessageHandler<SendToOneRequest> {

              @Override

              public void execute(Session session, SendToOneRequest message) {

                  // 這里,假裝直接成功

                  SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);

                  WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);

           

                  // 創(chuàng)建轉發(fā)的消息

                  SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())

                          .setContent(message.getContent());

                  // 廣播發(fā)送

                  WebSocketUtil.send(message.getToUser(), SendToUserRequest.TYPE, sendToUserRequest);

              }

              @Override

              public String getType() {

                  return SendToOneRequest.TYPE;

              }

          }

          代碼比較簡單,跟著代碼讀讀即可。

          4.7.4 SendToAllHandler

          創(chuàng)建 SendToAllHandler 類,處理 SendToAllRequest 消息。

          代碼如下:

          // SendToAllRequest.java

          @Component

          public class SendToAllHandler implements MessageHandler<SendToAllRequest> {

              @Override

              public void execute(Session session, SendToAllRequest message) {

                  // 這里,假裝直接成功

                  SendResponse sendResponse = newSendResponse().setMsgId(message.getMsgId()).setCode(0);

                  WebSocketUtil.send(session, SendResponse.TYPE, sendResponse);

                  // 創(chuàng)建轉發(fā)的消息

                  SendToUserRequest sendToUserRequest = newSendToUserRequest().setMsgId(message.getMsgId())

                          .setContent(message.getContent());

                  // 廣播發(fā)送

                  WebSocketUtil.broadcast(SendToUserRequest.TYPE, sendToUserRequest);

              }

           

              @Override

              public String getType() {

                  return SendToAllRequest.TYPE;

              }

          }

          代碼比較簡單,跟著代碼讀讀即可。

          4.8、WebSocketUtil

          代碼在 cn.iocoder.springboot.lab25.springwebsocket.util 包路徑下。

          創(chuàng)建 WebSocketUtil 工具類,主要提供兩方面的功能:

          • 1)Session 會話的管理;
          • 2)多種發(fā)送消息的方式。

          整體代碼比較簡單,自己瞅瞅喲。

          代碼在目錄中的如下位置: 

          4.9、完善 WebsocketServerEndpoint

          在本小節(jié),我們會修改 WebsocketServerEndpoint 的代碼,完善其功能。

          4.9.1 初始化 MessageHandler 集合

          實現(xiàn) InitializingBean 接口,在 #afterPropertiesSet() 方法中,掃描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。

          代碼如下:

          // WebsocketServerEndpoint.java

          /**

           * 消息類型與 MessageHandler 的映射

           *

           * 注意,這里設置成靜態(tài)變量。雖然說 WebsocketServerEndpoint 是單例,但是 Spring Boot 還是會為每個 WebSocket 創(chuàng)建一個 WebsocketServerEndpoint Bean 。

           */

          private static final Map<String, MessageHandler> HANDLERS = newHashMap<>();

          @Autowired

          private ApplicationContext applicationContext;

          @Override

          public void afterPropertiesSet() throws Exception {

              // 通過 ApplicationContext 獲得所有 MessageHandler Bean

              applicationContext.getBeansOfType(MessageHandler.class).values() // 獲得所有 MessageHandler Bean.forEach(messageHandler -> HANDLERS.put(messageHandler.getType(), messageHandler)); // 添加到 handlers 中

              logger.info("[afterPropertiesSet][消息處理器數(shù)量:{}]", HANDLERS.size());

          }

          通過這樣的方式,可以避免手動配置 MessageHandler 與消息類型的映射。

          4.9.2 onOpen

          重新實現(xiàn) #onOpen(Session session, EndpointConfig config) 方法,實現(xiàn)連接時,使用 accessToken 參數(shù)進行用戶認證。

          代碼如下:

          // WebsocketServerEndpoint.java

          @OnOpen

          public void onOpen(Session session, EndpointConfig config) {

              logger.info("[onOpen][session({}) 接入]", session);

              // <1> 解析 accessToken

              List<String> accessTokenValues = session.getRequestParameterMap().get("accessToken");

              String accessToken = !CollectionUtils.isEmpty(accessTokenValues) ? accessTokenValues.get(0) : null;

              // <2> 創(chuàng)建 AuthRequest 消息類型

              AuthRequest authRequest = newAuthRequest().setAccessToken(accessToken);

              // <3> 獲得消息處理器

              MessageHandler<AuthRequest> messageHandler = HANDLERS.get(AuthRequest.TYPE);

              if(messageHandler == null) {

                  logger.error("[onOpen][認證消息類型,不存在消息處理器]");

                  return;

              }

              messageHandler.execute(session, authRequest);

          }

          如代碼所示:

          • <1> 處:解析 ws:// 地址上的 accessToken 的請求參。例如說:ws://127.0.0.1:8080?accessToken=999999;
          • <2> 處:創(chuàng)建 AuthRequest 消息類型,并設置 accessToken 屬性;
          • <3> 處:獲得 AuthRequest 消息類型對應的 MessageHandler 消息處理器,然后調用 MessageHandler#execute(session, message) 方法,執(zhí)行處理用戶認證請求。

          打開三個瀏覽器創(chuàng)建,分別設置服務地址如下:

          • 1)ws://127.0.0.1:8080/?accessToken=芋艿;
          • 2)ws://127.0.0.1:8080/?accessToken=番茄;
          • 3)ws://127.0.0.1:8080/?accessToken=土豆。

          然后,逐個點擊「開啟連接」按鈕,進行 WebSocket 連接。

          最終效果如下圖:

          如上圖所示:

          • 1)在紅圈中,可以看到 AuthResponse 的消息;
          • 2)在黃圈中,可以看到 UserJoinNoticeRequest 的消息。

          4.9.3 onMessage

          重新實現(xiàn) #onMessage(Session session, String message) 方法,實現(xiàn)不同的消息,轉發(fā)給不同的 MessageHandler 消息處理器。

          代碼如下:

          // WebsocketServerEndpoint.java

          @OnMessage

          public void onMessage(Session session, String message) {

              logger.info("[onOpen][session({}) 接收到一條消息({})]", session, message); // 生產環(huán)境下,請設置成 debug 級別

              try{

                  // <1> 獲得消息類型

                  JSONObject jsonMessage = JSON.parseObject(message);

                  String messageType = jsonMessage.getString("type");

                  // <2> 獲得消息處理器

                  MessageHandler messageHandler = HANDLERS.get(messageType);

                  if(messageHandler == null) {

                      logger.error("[onMessage][消息類型({}) 不存在消息處理器]", messageType);

                      return;

                  }

                  // <3> 解析消息

                  Class<? extendsMessage> messageClass = this.getMessageClass(messageHandler);

                  // <4> 處理消息

                  Message messageObj = JSON.parseObject(jsonMessage.getString("body"), messageClass);

                  messageHandler.execute(session, messageObj);

              } catch(Throwable throwable) {

                  logger.info("[onMessage][session({}) message({}) 發(fā)生異常]", session, throwable);

              }

          }

          代碼中:

          • <1> 處,獲得消息類型,從 "type" 字段中;
          • <2> 處,獲得消息類型對應的 MessageHandler 消息處理器;
          • <3> 處,調用 #getMessageClass(MessageHandler handler) 方法,通過 MessageHandler 中,通過解析其類上的泛型,獲得消息類型對應的 Class 類。

          代碼如下:

          // WebsocketServerEndpoint.java

          private Class<? extends Message> getMessageClass(MessageHandler handler) {

              // 獲得 Bean 對應的 Class 類名。因為有可能被 AOP 代理過。

              Class<?> targetClass = AopProxyUtils.ultimateTargetClass(handler);

              // 獲得接口的 Type 數(shù)組

              Type[] interfaces = targetClass.getGenericInterfaces();

              Class<?> superclass = targetClass.getSuperclass();

              while((Objects.isNull(interfaces) || 0== interfaces.length) && Objects.nonNull(superclass)) { // 此處,是以父類的接口為準

                  interfaces = superclass.getGenericInterfaces();

                  superclass = targetClass.getSuperclass();

              }

              if(Objects.nonNull(interfaces)) {

                  // 遍歷 interfaces 數(shù)組

                  for(Type type : interfaces) {

                      // 要求 type 是泛型參數(shù)

                      if(type instanceof ParameterizedType) {

                          ParameterizedType parameterizedType = (ParameterizedType) type;

                          // 要求是 MessageHandler 接口

                          if(Objects.equals(parameterizedType.getRawType(), MessageHandler.class)) {

                              Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();

                              // 取首個元素

                              if(Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {

                                  return(Class<Message>) actualTypeArguments[0];

                              } else{

                                  thrownewIllegalStateException(String.format("類型(%s) 獲得不到消息類型", handler));

                              }

                          }

                      }

                  }

              }

              throw new IllegalStateException(String.format("類型(%s) 獲得不到消息類型", handler));

          }

          這是參考 rocketmq-spring 項目的 DefaultRocketMQListenerContainer#getMessageType() 方法,進行略微修改。

          如果大家對 Java 的泛型機制沒有做過一點了解,可能略微有點硬核。可以先暫時跳過,知道意圖即可。

          <4> 處,調用 MessageHandler#execute(session, message) 方法,執(zhí)行處理請求。

          另外:這里增加了 try-catch 代碼,避免整個執(zhí)行的過程中,發(fā)生異常。如果在 onMessage 事件的處理中,發(fā)生異常,該消息對應的 Session 會話會被自動關閉。顯然,這個不符合我們的要求。例如說,在 MessageHandler 處理消息的過程中,發(fā)生一些異常是無法避免的。

          繼續(xù)基于上述創(chuàng)建的三個瀏覽器,我們先點擊「清空消息」按鈕,清空下消息,打掃下上次測試展示出來的接收得到的 Message 。當然,WebSocket 的連接,不需要去斷開。

          在第一個瀏覽器中,分別發(fā)送兩種聊天消息。

          一條 SendToOneRequest 私聊消息:

          {

              type: "SEND_TO_ONE_REQUEST",

              body: {

                  toUser: "番茄",

                  msgId: "eaef4a3c-35dd-46ee-b548-f9c4eb6396fe",

                  content: "我是一條單聊消息"

              }

          }

          一條 SendToAllHandler 群聊消息:

          {

              type: "SEND_TO_ALL_REQUEST",

              body: {

                  msgId: "838e97e1-6ae9-40f9-99c3-f7127ed64747",

                  content: "我是一條群聊消息"

              }

          }

          最終結果如下圖:

          如上圖所示:

          • 1)在紅圈中,可以看到一條 SendToUserRequest 的消息,僅有第二個瀏覽器(番茄)收到;
          • 2)在黃圈中,可以看到三條 SendToUserRequest 的消息,所有瀏覽器都收到。

          4.9.4 onClose

          重新實現(xiàn) #onClose(Session session, CloseReason closeReason) 方法,實現(xiàn)移除關閉的 Session 。

          代碼如下:

          // WebsocketServerEndpoint.java

          @OnClose

          public void onClose(Session session, CloseReason closeReason) {

              logger.info("[onClose][session({}) 連接關閉。關閉原因是({})}]", session, closeReason);

              WebSocketUtil.removeSession(session);

          }

          4.9.5 onError

          #onError(Session session, Throwable throwable) 方法,保持不變。

          代碼如下:

          // WebsocketServerEndpoint.java

          @OnError

          public void onError(Session session, Throwable throwable) {

              logger.info("[onClose][session({}) 發(fā)生異常]", session, throwable);

          }

          五、Spring WebSocket 實戰(zhàn)入門

          5.0、基礎介紹

          示例代碼下載:

          (因附件無法上傳到此處,請從同步鏈接處下載:http://www.52im.net/thread-3483-1-1.html

          仔細一個捉摸,虎軀一震,還是提供一個 Spring WebSocket 快速入門的示例。

          在 上章「Tomcat WebSocket 實戰(zhàn)入門」 的 lab-websocket-25-01 示例的基礎上,我們復制出 lab-websocket-25-02 項目,進行改造。

          改造的代碼目錄內容是這樣:

          5.1、WebSocketUtil

          因為 Tomcat WebSocket 使用的是 Session 作為會話,而 Spring WebSocket 使用的是 WebSocketSession 作為會話,導致我們需要略微修改下 WebSocketUtil 工具類。改動非常略微,點擊 WebSocketUtil.java 查看下,秒懂的噢。

          主要有兩點:

          • 1)將所有使用 Session 類的地方,調整成 WebSocketSession 類;
          • 2)將發(fā)送消息,從 Session 修改成 WebSocketSession 。

          5.2、消息處理器

          將 cn.iocoder.springboot.lab25.springwebsocket.handler 包路徑下的消息處理器們,使用到 Session 類的地方,調整成 WebSocketSession 類。

          5.3、DemoWebSocketShakeInterceptor

          在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包路徑下,創(chuàng)建 DemoWebSocketShakeInterceptor 攔截器。因為 WebSocketSession 無法獲得 ws 地址上的請求參數(shù),所以只好通過該攔截器,獲得 accessToken 請求參數(shù),設置到 attributes 中。

          代碼如下:

          // DemoWebSocketShakeInterceptor.java

          public class DemoWebSocketShakeInterceptor extends HttpSessionHandshakeInterceptor {

           

              @Override// 攔截 Handshake 事件

              public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throwsException {

                  // 獲得 accessToken

                  if(request instanceof ServletServerHttpRequest) {

                      ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;

                      attributes.put("accessToken", serverRequest.getServletRequest().getParameter("accessToken"));

                  }

                  // 調用父方法,繼續(xù)執(zhí)行邏輯

                  return super.beforeHandshake(request, response, wsHandler, attributes);

              }

          }

          5.4、DemoWebSocketHandler

          在 cn.iocoder.springboot.lab25.springwebsocket.websocket 包路徑下,創(chuàng)建 DemoWebSocketHandler 處理器。該處理器參考 「5.9、完善 WebsocketServerEndpoint」 小節(jié),編寫它的代碼。

          DemoWebSocketHandler.java代碼位于如下目錄處,具體內容就不貼出來了,自已去讀一讀:

          代碼極其相似,簡單擼下即可。

          5.5、WebSocketConfiguration

          修改 WebSocketConfiguration 配置類,代碼如下:

          // WebSocketConfiguration.java

          @Configuration

          @EnableWebSocket// 開啟 Spring WebSocket

          public class WebSocketConfiguration implements WebSocketConfigurer {

              @Override

              public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

                  registry.addHandler(this.webSocketHandler(), "/") // 配置處理器

                          .addInterceptors(newDemoWebSocketShakeInterceptor()) // 配置攔截器

                          .setAllowedOrigins("*"); // 解決跨域問題

              }

           

              @Bean

              public DemoWebSocketHandler webSocketHandler() {

                  return new DemoWebSocketHandler();

              }

           

              @Bean

              public DemoWebSocketShakeInterceptor webSocketShakeInterceptor() {

                  return new DemoWebSocketShakeInterceptor();

              }

          }

          解釋一下:

          • 1)在類上,添加 @EnableWebSocket 注解,開啟 Spring WebSocket 功能;
          • 2)實現(xiàn) WebSocketConfigurer 接口,自定義 WebSocket 的配置(具體可以看看 #registerWebSocketHandlers(registry) 方法,配置 WebSocket 處理器、攔截器,以及允許跨域)。

          至此,我們已經(jīng)完成 Spring WebSocket 的示例。

          后面,我們執(zhí)行 Application 來啟動項目。具體的測試,這里就不重復了,可以自己使用 WebSocket 在線測試工具 來測試下。

          七、寫在最后

          雖然說,WebSocket 協(xié)議已經(jīng)在主流的瀏覽器上,得到非常好的支持,但是總有一些“異類”,是不兼容的。所以就誕生了 SockJS、Socket.io這類庫。關于它們的介紹與使用,可以看看 《SockJS 簡單介紹》 、《Web端即時通訊技術的發(fā)展與WebSocket、Socket.io的技術實踐》文章。

          實際場景下,我們在使用 WebSocket 還是原生 Socket 也好,都需要考慮“如何保證消息一定送達給用戶?”

          大家肯定能夠想到的是:如果用戶不處于在線的時候,消息持久化到 MySQL、MongoDB 等等數(shù)據(jù)庫中。這個是正確,且是必須要做的。

          我們在一起考慮下邊界場景:客戶端網(wǎng)絡環(huán)境較差,特別是在移動端場景下,出現(xiàn)網(wǎng)絡閃斷,可能會出現(xiàn)連接實際已經(jīng)斷開,而服務端以為客戶端處于在線的情況。此時,服務端會將消息發(fā)給客戶端,那么消息實際就發(fā)送到“空氣”中,產生丟失的情況。

          要解決這種情況下的問題,需要引入客戶端的 ACK 消息機制。

          目前,主流的有兩種做法。

          第一種:基于每一條消息編號 ACK

          整體流程如下:

          • 1)無論客戶端是否在線,服務端都先把接收到的消息持久化到數(shù)據(jù)庫中。如果客戶端此時在線,服務端將完整消息推送給客戶端;
          • 2)客戶端在接收到消息之后,發(fā)送 ACK 消息編號給服務端,告知已經(jīng)收到該消息。服務端在收到 ACK 消息編號的時候,標記該消息已經(jīng)發(fā)送成功;
          • 3)服務端定時輪詢,在線的客戶端,是否有超過 N 秒未 ACK 的消息。如果有,則重新發(fā)送消息給對應的客戶端。

          這種方案,因為客戶端逐條 ACK 消息編號,所以會導致客戶端和服務端交互次數(shù)過多。當然,客戶端可以異步批量 ACK 多條消息,從而減少次數(shù)。

          不過因為服務端仍然需要定時輪詢,也會導致服務端壓力較大。所以,這種方案基本已經(jīng)不采用了。

          第二種:基于滑動窗口 ACK

          整體流程如下:

          • 1)無論客戶端是否在線,服務端都先把接收到的消息持久化到數(shù)據(jù)庫中。如果客戶端此時在線,服務端將消息編號推送給客戶端;
          • 2)客戶端在接收到消息編號之后,和本地的消息編號進行比對。如果比本地的小,說明該消息已經(jīng)收到,忽略不處理;如果比本地的大,使用本地的消息編號,向服務端拉取大于本地的消息編號的消息列表,即增量消息列表。拉取完成后,更新消息列表中最大的消息編號為新的本地的消息編號;
          • 3)服務端在收到客戶端拉取增量的消息列表時,將請求的編號記錄到數(shù)據(jù)庫中,用于知道客戶端此時本地的最新消息編號;
          • 4)考慮到服務端將消息編號推送給客戶端,也會存在丟失的情況,所以客戶端會每 N 秒定時向服務端拉取大于本地的消息編號的消息列表。

          這種方式,在業(yè)務被稱為推拉結合的方案,在分布式消息隊列、配置中心、注冊中心實現(xiàn)實時的數(shù)據(jù)同步,經(jīng)常被采用。

          并且,采用這種方案的情況下,客戶端和服務端不一定需要使用長連接,也可以使用長輪詢所替代。

          做法比如,客戶端發(fā)送帶有消息版本號的 HTTP 請求到服務端:

          • 1)如果服務端已有比客戶端新的消息編號,則直接返回增量的消息列表;
          • 2)如果服務端沒有比客戶端新的消息編號,則 HOLD 住請求,直到有新的消息列表可以返回,或者 HTTP 請求超時;
          • 3)客戶端在收到 HTTP 請求超時時,立即又重新發(fā)起帶有消息版本號的 HTTP 請求到服務端。如此反復循環(huán),通過消息編號作為增量標識,達到實時獲取消息的目的。

          如果大家對消息可靠投遞這塊感興趣,可以看看下面這幾篇:

          畢竟,本篇這里寫的有點簡略哈 ~

          最后:如果你想系統(tǒng)的學習IM開發(fā)方面方面的知識,推薦詳讀:《新手入門一篇就夠:從零開發(fā)移動端IM》。如果你自認為已經(jīng)有點小牛x了,可以看看生產環(huán)境下的大用戶量IM系統(tǒng)架構設計方面的知識:《從新手到專家:如何設計一套億級消息量的分布式IM系統(tǒng)》。

          限于篇幅,這里就不再繼續(xù)展開了。

          附錄:更多IM開發(fā)動手實踐文章

          自已開發(fā)IM有那么難嗎?手把手教你自擼一個Andriod版簡易IM (有源碼)

          一種Android端IM智能心跳算法的設計與實現(xiàn)探討(含樣例代碼)

          手把手教你用Netty實現(xiàn)網(wǎng)絡通信程序的心跳機制、斷線重連機制

          輕量級即時通訊框架MobileIMSDK的iOS源碼(開源版)[附件下載]

          開源IM工程“蘑菇街TeamTalk”2015年5月前未刪減版完整代碼 [附件下載]

          NIO框架入門(一):服務端基于Netty4的UDP雙向通信Demo演示 [附件下載]

          NIO框架入門(二):服務端基于MINA2的UDP雙向通信Demo演示 [附件下載]

          NIO框架入門(三):iOS與MINA2、Netty4的跨平臺UDP雙向通信實戰(zhàn) [附件下載]

          NIO框架入門(四):Android與MINA2、Netty4的跨平臺UDP雙向通信實戰(zhàn) [附件下載]

          一個WebSocket實時聊天室Demo:基于node.js+socket.io [附件下載]

          適合新手:從零開發(fā)一個IM服務端(基于Netty,有完整源碼)

          拿起鍵盤就是干:跟我一起徒手開發(fā)一套分布式IM系統(tǒng)

          正確理解IM長連接的心跳及重連機制,并動手實現(xiàn)(有完整IM源碼)

          適合新手:手把手教你用Go快速搭建高性能、可擴展的IM系統(tǒng)(有源碼)

          跟著源碼一起學:手把手教你用WebSocket打造Web端IM聊天

          本文已同步發(fā)布于“即時通訊技術圈”公眾號。

          ▲ 本文在公眾號上的鏈接是:點此進入。同步發(fā)布鏈接是:http://www.52im.net/thread-3483-1-1.html 



          作者:Jack Jiang (點擊作者姓名進入Github)
          出處:http://www.52im.net/space-uid-1.html
          交流:歡迎加入即時通訊開發(fā)交流群 215891622
          討論:http://www.52im.net/
          Jack Jiang同時是【原創(chuàng)Java Swing外觀工程BeautyEye】【輕量級移動端即時通訊框架MobileIMSDK】的作者,可前往下載交流。
          本博文 歡迎轉載,轉載請注明出處(也可前往 我的52im.net 找到我)。


          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導航:
           
          Jack Jiang的 Mail: jb2011@163.com, 聯(lián)系QQ: 413980957, 微信: hellojackjiang
          主站蜘蛛池模板: 巴塘县| 澎湖县| 威远县| 大连市| 甘孜| 徐闻县| 鲜城| 临沭县| 美姑县| 黄大仙区| 乳源| 本溪市| 金华市| 南通市| 厦门市| 兴和县| 霍林郭勒市| 巴东县| 乐清市| 辛集市| 鹤庆县| 尤溪县| 白沙| 南充市| 依兰县| 广南县| 东乌珠穆沁旗| 凌海市| 江口县| 公主岭市| 施秉县| 福安市| 新化县| 巧家县| 玉龙| 甘孜| 安多县| 潞西市| 壤塘县| 万州区| 黄梅县|