Jack Jiang

          我的最新工程MobileIMSDK:http://git.oschina.net/jackjiang/MobileIMSDK
          posts - 499, comments - 13, trackbacks - 0, articles - 1

          本文由攜程前端開發專家Chris Xia分享,關注新技術革新和研發效率提升。

          1、引言

          本文介紹了攜程機票前端基于Server-Sent Events(SSE)實現服務端推送的企業級全鏈路通用技術解決方案。文章深入探討了 SSE 技術在應用過程中包括方案對比、技術選型、鏈路層優化以及實際效果等多維度的技術細節,為類似使用場景提供普適性參考和借鑒。該方案設計目標是實現通用性,適用于各種網絡架構和業務場景。

           

          2、技術背景

          在如今互聯網應用中,實時數據推送已成為很多業務場景的關鍵技術解決方案。攜程機票業務作為在線旅游行業的核心場景,面臨著航班數據實時性要求高、信息維度復雜等挑戰。

          Server-Sent Events(SSE)技術作為一種基于 HTTP 長連接的服務器推送方案,非常適用于機票業務"服務端主動推送、客戶端實時展示"的需求特點。

          相較于 WebSocket 等雙向通信協議,SSE 在實現簡單性、協議輕量級和瀏覽器兼容性等方面具有顯著優勢,適合機票列表頁這類以服務端數據為主導的業務場景。

          3、認識SSE

          3.1 SSE 是什么?

          Server-Sent Events(SSE)服務器發送事件,是一種基于 HTTP 長連接,允許服務器單向實時推送數據到客戶端的技術。

          SSE 的工作原理非常簡單直觀。客戶端通過與服務器建立一條持久化的 HTTP 連接,然后服務器使用該連接將數據以事件流(event stream)的形式發送給客戶端。這些事件流由多個事件(event)組成,每個事件包含一個標識符、類型和數據字段。客戶端通過監聽事件流來獲取最新的數據,并在接收到事件后進行處理。

          關于SSE技術的詳細介紹可以閱讀SSE技術詳解:一種全新的HTML5服務器推送事件技術》。

          3.2 SSE 的使用場景

          SSE 使用場景非常廣泛,大家熟知的 Chatgpt 對話的交互形式使用的就是 SSE 技術。

          SSE 在服務器單向實時推送數據的場景非常適用:

          • 1)實時數據流:如股票市場更新、新聞推送、體育比分更新等;
          • 2)實時通知:如社交媒體消息提醒、新訂單通知等;
          • 3)儀表盤更新:如系統監控、實時數據統計等。

          關于SSE在如今熱門的AI大模型技術的中的應用可以閱讀:全民AI時代,大模型客戶端和服務端的實時通信到底用什么協議?》、《大模型時代多模型AI網關的架構設計與實現》。

          4、先說效果

          機票前端首次在核心業務中(機票航班列表)使用 SSE 技術,機票列表頁由原先客戶端串行請求獲取多批次航班數據變為一次請求由服務持續推送數據給客戶端。

          在調研了公司內外各種實現方案,最終聯合攜程框架、SRE、機票前后端團隊共同實現了全公司通用的SSE技術解決方案(詳情見下文中的全鏈路支持部分)。

          1)使用 SSE 前(如下圖):

          • 1)客戶端需要發起兩次請求獲取完整航班數據;
          • 2)服務端采用預取優化:在響應第一次請求時,提前獲取第二批數據并緩存至 Redis(降低客戶端第二次請求響應的耗時);
          • 3)客戶端發起第二次請求時,可直接獲取緩存數據。

          這樣的流程和技術方案無疑會提升前后端的代碼復雜度,服務端需要額外增加一層緩存來提升響應時間,客戶端無法感知服務到底有多少批次數據,需要不斷問詢。

          2)使用 SSE 后(如下圖):

          客戶端發送一次 SSE 請求,服務端實時推送數據到客戶端,服務間上下游同樣采用流式傳輸,實現客戶端到服務端全鏈路流式通信。

          3)SSE 為前后端帶來的價值:

          • 1)減少請求傳輸耗時:無需請求多次,減少了多次請求的傳輸耗時;
          • 2)前后端代碼結構優化:代碼更簡潔且易于理解,減少串行請求的回調監聽/嵌套;
          • 3)服務邏輯優化:列表數據移除了 redis 的發布訂閱流程,簡化了代碼架構;
          • 4)資源利用率提升:減少冗余請求(只有一批數據時,客戶端不用再次請求問詢服務)。

          4)SSE 對性能有提升嗎?

          通過分析請求流程(建立鏈接 -> 發送請求 -> 響應數據傳輸)和其原理,發現 HTTP 1.1 和 2 支持鏈路復用,因此鏈接建立的次數本質上沒有變化。在傳輸通道和數據壓縮方式保持不變的情況下,響應數據傳輸的耗時也不會有明顯變化。

          SSE 的核心性能優勢在于減少了請求發送的次數,其性能增益取決于具體的使用場景:

          a)當服務端響應耗時大于網絡傳輸耗時,性能提升有限。

          使用 SSE 與傳統串行請求的性能實驗數據對比:

          b)當網絡傳輸耗時大于服務端處理耗時,減少請求次數可以顯著降低整體延遲。

          5、方案選型

          目前市面上很多服務端推送的技術解決方案:SSE、輪詢/串行、Websocket 等,我們從易用性,資源開銷,使用場景等多維度對比了幾個使用較多的主流方案,最終選擇了 SSE。

          5.1 服務端推送

          簡單幾行代碼實現服務端SSE推送。

          SSE 的數據傳輸規范中有 4 個關鍵字段 event、data、id 和 retry,用于定義和傳輸事件數據。

          具體是:

          • 1)even:定義消息的事件類型,客戶端可以根據事件類型觸發不同的處理邏輯;
          • 2)data:消息的主體內容;
          • 3)id:為消息設置一個唯一的 ID,用于客戶端斷線重連時標識最后接收的消息;
          • 4)retry:服務端指定客戶端在連接斷開后重新連接的時間間隔(單位為毫秒)。

          這些字段共同構成了 SSE 消息的基本格式,每條消息以兩個換行符 \n\n 結束,確保客戶端能夠正確解析和處理事件數據。

          前端使用樣例:

          // 創建 EventSource

          const evtSource = new EventSource("接口地址");

           

          // 監聽服務端推送的數據

          evtSource.onmessage = function (event) {

            console.log("接收到的消息:", event);

          };

           

          // 監聽連接建立

          evtSource.onopen = function () {

            console.log("連接已建立");

          };

           

          // 監聽報錯

          evtSource.onerror = function (err) {

            console.error("發生異常:", err);

          };

          服務使用樣例(以 Nodejs 為例):

          const http = require("http");

           

          http

          .createServer((req, res) => {

              // 設置Response Header

              res.writeHead(200, {

                "Content-Type": "text/event-stream",

                "Cache-Control": "no-cache",

                Connection: "keep-alive",

              });

              // 不斷推送數據給客戶端

              const pushData = setInterval(() => {

                res.write(data);

              }, 1000);

           

              req.on("close", () => clearInterval(pushData));

            })

            .listen(3000);

          5.2 內部SSE實踐方案

          調研發現公司內部有兩套實踐方案:

          • 1)自定義響應式網關:實現網關輪詢服務批量獲取數據,從而實現流式傳輸。繞開公司鏈路層,沒有通用性;
          • 2)前端輪詢下沉BFF(服務):前端與BFF建立SSE通道,BFF不斷輪詢向上游批量獲取數據。輪詢位置發生變化,并未實現全鏈路的流式通信。

          在攜程企業級網絡生態架構下,從通用性和完整度分析對比了兩套方案,并沒有真正意義上從前到后打通整條鏈路。

          僅僅只是簡單接入SSE是遠遠不夠的,離不開全鏈路(SSE技術選型,多層網絡架構的適配,服務間的流式通信等等)的支持,所以最終決定聯合攜程框架、SRE、機票前后端團隊共同來實現對SSE全鏈路的適配,真正意義上實現全公司通用的普適方案。

          5.3 SSE技術選型

          確定好整體技術方案后,我們在實際測試過程中發現了 2 個 Web 原生 SSE 的局限性問題。

          具體是:

          • 1)僅支持 Get 請求:對需要傳遞一些復雜請求體的場景不友好;
          • 2)不支持自定義 http header:無法支持自定義 header 透傳,鑒權等場景,目前市面大部分解決方案是使用 Cookie 來攜帶自定義參數。

          針對上述問題,調研發現微軟開源的 SSE 網絡庫 @microsoft/fetch-event-source(以下簡稱 fes)能夠很好的解決。fes 是基于 Fetch 和 ReadableStream 來實現的 SSE 功能,旨在提供更加靈活便利的調用方式。

          原生 SSE 和 fes 的對比:

          fetch-event-source 詳解:fes 的核心原理是通過 Fetch 發送請求,ReadableStream 讀取響應流,在 JS 側實現字節流數據的解析。通過對比原生 SSE(chromium 內核中 EventSource)和 fes 的代碼,發現整體流程與實現方案大致相同,關鍵區別在于流的解析,原生 SSE 在瀏覽器內核由 C++實現,fes 在 JS 側實現。

          fes 的流解析:

          • 1)核心方法:getBytes、getLines  和  getMessages;
          • 2)getBytes:通過 ReadableStream 讀取響應字節流,獲取每個字節塊;
          • 3)getLines:將 getBytes 獲取到的字節塊解析為 EventSource 行緩沖區,處理這些字節塊并解析為行,然后調用  onLine  回調函數處理每一行;
          • 4)getMessages:創建 EventSourceMessage 對象,將行緩沖區數據解析并進行組裝,處理完成后回調給調用方。

          export async function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void) {

              const reader = stream.getReader();

              let result: ReadableStreamDefaultReadResult<Uint8Array>;

              while (!(result = await reader.read()).done) {

                  onChunk(result.value);

              }

          }

          export function getMessages(

              onId: (id: string) => void,

              onRetry: (retry: number) => void,

              onMessage?: (msg: EventSourceMessage) => void

          ) {

              let message = newMessage();

              const decoder = new TextDecoder();

           

              // return a function that can process each incoming line buffer:

              return function onLine(line: Uint8Array, fieldLength: number) {

                  if (line.length === 0) {

                      // empty line denotes end of message. Trigger the callback and start a new message:

                      onMessage?.(message);

                      message = newMessage();

                  } else if (fieldLength > 0) { // exclude comments and lines with no values

                      // line is of format "<field>:<value>" or "<field>: <value>"

                      // [url=https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation]https://html.spec.whatwg.org/mul ... ream-interpretation[/url]

                      const field = decoder.decode(line.subarray(0, fieldLength));

                      const valueOffset = fieldLength + (line[fieldLength + 1] === ControlChars.Space ? 2 : 1);

                      const value = decoder.decode(line.subarray(valueOffset));

           

                      switch (field) {

                          case 'data':

                              // if this message already has data, append the new value to the old.

                              // otherwise, just set to the new value:

                              message.data = message.data

                                  ? message.data + '\n' + value

                                  : value;

                              break;

                          case 'event':

                              message.event = value;

                              break;

                          case 'id':

                              onId(message.id = value);

                              break;

                          case 'retry':

                              const retry = parseInt(value, 10);

                              if (!isNaN(retry)) {

                                  onRetry(message.retry = retry);

                              }

                              break;

                      }

                  }

              }

          }

          export function getLines(onLine: (line: Uint8Array, fieldLength: number) => void) {

              let buffer: Uint8Array | undefined;

              let position: number; // current read position

              let fieldLength: number; // length of the `field` portion of the line

              let discardTrailingNewline = false;

           

              return function onChunk(arr: Uint8Array) {

                  if (buffer === undefined) {

                      buffer = arr;

                      position = 0;

                      fieldLength = -1;

                  } else {

                      buffer = concat(buffer, arr);

                  }

           

                  const bufLength = buffer.length;

                  let lineStart = 0; // index where the current line starts

                  while (position < bufLength) {

                      if (discardTrailingNewline) {

                          if (buffer[position] === ControlChars.NewLine) {

                              lineStart = ++position; // skip to next char

                          }

           

                          discardTrailingNewline = false;

                      }

           

                      let lineEnd = -1; // index of the \r or \n char

                      for (; position < bufLength && lineEnd === -1; ++position) {

                          switch (buffer[position]) {

                              case ControlChars.Colon:

                                  if (fieldLength === -1) { // first colon in line

                                      fieldLength = position - lineStart;

                                  }

                                  break;

                              case ControlChars.CarriageReturn:

                                  discardTrailingNewline = true;

                              case ControlChars.NewLine:

                                  lineEnd = position;

                                  break;

                          }

                      }

           

                      if (lineEnd === -1) {

                          break;

                      }

           

                      onLine(buffer.subarray(lineStart, lineEnd), fieldLength);

                      lineStart = position; // we're now on the next line

                      fieldLength = -1;

                  }

           

                  if (lineStart === bufLength) {

                      buffer = undefined; // we've finished reading it

                  } else if (lineStart !== 0) {

                      buffer = buffer.subarray(lineStart);

                      position -= lineStart;

                  }

              }

          }

          6、全鏈路打通

          企業級應用時,在非直連多層網絡架構的環境下,應用SSE不僅需要考慮前后端的使用,還需要考慮鏈路層、框架層、數據層等多環節的支持。通過不同團隊(如框架、SRE、機票前端和后端團隊)的協作,開發出一個在公司范圍內通用的解決方案。

          6.1 鏈路層

          在攜程海外上云、多地多活服務架構、多層網絡架構的背景下,攜程框架及SRE團隊提供了大力支持,完整打通了各鏈路層之間的流式傳輸。

          多層網絡架構:

          • 1)7層加速節點(akamai/aws):提供全球范圍內的快速數據傳輸;
          • 2)流量接入層(slb):確保高可用性和負載均衡;
          • 3)中間轉發節點(蟲洞):優化跨Region數據傳輸路徑,減少延遲;
          • 4)sidecar(envoy/nginx):容器流量管理,增強了應用的可維護性和擴展性。

          對于絕大部分負載均衡,一般只保證完整報文的交付,并不保證報文的交付形式(流式/聚合),聚合場景下會導致"數據碎片"被聚合再交付,無法實現流式分批傳輸(如下圖所示)。

          以Nginx為例:Nginx 會緩存代理服務器的響應(聚合類型),服務推送的數據被 Nginx 緩存到緩沖區,導致客戶端沒有實時收到數據,而是等到服務所有數據推送完后,客戶端才一次性收到了所有數據。

          適配方案:禁用緩存功能,服務端響應時除了設置 SSE 所必須的 Response Header 外,還需要添加非標 Header:X-Accel-Buffering: no,告知 Nginx 不緩存響應,確保數據實時發送到客戶端。

          值得注意的是:在多層網絡架構的環境下 X-Accel-Buffering: no Header 在各層網關之間轉發時會丟失,所以在多層網絡架構下 Nginx 需要添加 proxy_pass_header X-Accel-Buffering,來確保整條鏈路上 Header 的傳遞。

          6.2 框架層

          前端框架團隊基于fes實現SSE網絡請求,合并到公司基礎網絡框架,共享網絡優化,監控等基建能力,全公司通用。服務端基于Reactor + Dubbo Streaming實現服務間上下游全鏈路響應式流式傳輸。

          通過鏈路層的支持,從前端到服務端實現了統一的全鏈路流式傳輸通信,確保數據的高效傳輸和處理。

          6.3 數據層

          數據傳輸需注意代理服務器或 Web 容器(Nginx、Tomcat)對SSE MIME Type:text/event-stream的支持,未正確配置,服務端推送的數據不會經過任何壓縮,傳輸數據大,導致客戶端響應耗時增加。

          適配方案:根據不同的服務器類型進行配置。

          Nginx:

          Tomcat:

          7、全鏈路打通

          本文介紹了 SSE 在攜程機票前端全鏈路企業級應用實踐,解決了服務向前端實時推送數據的問題。

          通過合理的技術選型、流式數據解析和鏈路傳輸層優化,從鏈路層,框架層,數據層全鏈路實現全公司通用的普適方案。降低了前后端代碼復雜度,提升了資源利用率。

          隨著流式通信技術的不斷發展,SSE 將在更多場景中(覆蓋更多客戶端,支持更多網絡協議)發揮重要作用,為實時數據處理提供更高效的解決方案。

          8、參考資料

          [1] 新手入門貼:史上最全Web端即時通訊技術原理詳解

          [2] Web端即時通訊技術盤點:短輪詢、Comet、Websocket、SSE

          [3] SSE技術詳解:一種全新的HTML5服務器推送事件技術

          [4] 使用WebSocket和SSE技術實現Web端消息推送

          [5] 詳解Web端通信方式的演進:從Ajax、JSONP 到 SSE、Websocket

          [6] 網頁端IM通信技術快速入門:短輪詢、長輪詢、SSE、WebSocket

          [7] 搞懂現代Web端即時通訊技術一文就夠:WebSocket、socket.io、SSE

          [8] 全民AI時代,大模型客戶端和服務端的實時通信到底用什么協議?

          [9] 大模型時代多模型AI網關的架構設計與實現

          9、更多Web端即時通訊技術

          一文讀懂前端技術演進:盤點Web前端20年的技術變遷史

          Comet技術詳解:基于HTTP長連接的Web端實時通信技術

          新手快速入門:WebSocket簡明教程

          理論聯系實際:從零理解WebSocket的通信原理、協議格式、安全性

          WebSocket從入門到精通,半小時就夠!

          LinkedIn的Web端即時通訊實踐:實現單機幾十萬條長連接

          Web端即時通訊技術的發展與WebSocket、Socket.io的技術實踐

          長連接網關技術專題(四):愛奇藝WebSocket實時推送網關技術實踐

          Web端即時通訊實踐干貨:如何讓你的WebSocket斷網重連更快速?


          (本文已同步發布于:http://www.52im.net/thread-4832-1-1.html)



          作者:Jack Jiang (點擊作者姓名進入Github)
          出處:http://www.52im.net/space-uid-1.html
          交流:歡迎加入即時通訊開發交流群 215891622
          討論:http://www.52im.net/
          Jack Jiang同時是【原創Java Swing外觀工程BeautyEye】【輕量級移動端即時通訊框架MobileIMSDK】的作者,可前往下載交流。
          本博文 歡迎轉載,轉載請注明出處(也可前往 我的52im.net 找到我)。


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          Jack Jiang的 Mail: jb2011@163.com, 聯系QQ: 413980957, 微信: hellojackjiang
          主站蜘蛛池模板: 比如县| 大冶市| 敦煌市| 华宁县| 浦城县| 藁城市| 静安区| 额敏县| 山丹县| 辉南县| 昭苏县| 勃利县| 株洲县| 台东县| 苏尼特右旗| 望奎县| 沂源县| 昌邑市| 微山县| 夏津县| 安溪县| 奉新县| 长乐市| 新野县| 云南省| 信宜市| 城口县| 远安县| 依兰县| 盖州市| 莫力| 鹤山市| 万年县| 固原市| 施秉县| 如皋市| 皮山县| 梨树县| 东兴市| 彰化县| 兴安盟|