海闊天空

          I'm on my way!
          隨筆 - 17, 文章 - 69, 評論 - 21, 引用 - 0
          數據加載中……

          一個Tcp網絡服務框架例子

          WindowsIOCPLinuxepollFreeBSDkqueue寫了一個支持高并發、多CPU、跨平臺的TCP網絡服務框架。

          測試

          下載netfrm.v2.rar,解壓縮得到netfrm.v2目錄,里面有netfrm.v2.vcprojsrc目錄。
          測試代碼在src/main.cpp

          #include <stdio.h>
          #include "./lance/ldebug.h"
          #include "./lance/tcpsrv.hpp"
          #include "./lance/systm.h"
          class MyClient : public lance::net::Client
          {
          public: void OnConnect()
                   {
                        printf("OnConnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);

                        recv(data, 255);
                   }
          public: void OnDisconnect()
                   {
                        printf("OnDisconnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
                   }
          public: void OnRecv(int len)
                   {
                        data[len] = 0x00;
                        printf("OnRecv: fd=%08x, data=%s\n", fd, data);

                        if (data[0] == 'a')
                        {
                             printf("user exit command\n");
                             close();
                        }
                        recv(data, 255);
                   }

          public: char data[256];
          };


          int main(char * args[])
          {
               lance::net::TCPSrv<MyClient> srv;

               srv.ip = 0;
               srv.port = 1234;
               srv.ptr = NULL;
               srv.backlogs = 10;
               srv.threads = 1;
               srv.scheds = 0;
               srv.start();

               while(true)
               {
                   lance::systm::sleep(2000);
               }

               return 0;
          }

          測試代碼綁定本機所有IP地址,在1234端口開啟網絡服務,接收客戶端發送的字符串,并將這些字符串打印到控制臺上。

          Windows平臺

          Windows XP SP2下用vs2003編譯測試通過。
          vs2003打開netfrm.v2.vcproj,然后編譯、運行,會彈出控制臺窗口。
          Windows開始菜單->運行->cmd,啟動Windows命令窗口,輸入telnet 127.0.0.1 1234,回車連接到測試網絡服務,如果一切正常,網絡服務控制臺窗口將顯示連接信息,可以在Windows命令窗口隨便輸入信息,這時網絡服務控制臺窗口將打印輸入的信息。
          如下圖所示:
          500)this.width=500;" border="0" width="500">
          1
          輸入字符a表示斷開網絡連接。

          Linux平臺

          LinuxRed Hat Enterprise Linux 4下測試通過,其他Linux平臺需要Linux 2.6及以上支持epoll的內核。
          首先轉到src目錄:
          $ cd src
          編譯:
          $ make –f Makefile.linux clean all
          這時會在當前目錄生成tcpsrv.0.1.bin的可執行文件,執行:
          $ ./ tcpsrv.0.1.bin
          再打開一個命令行窗口,測試:
          $ telnet 127.0.0.1 1234
          輸入字符串并回車,剛才執行tcpsrv.0.1.bin的窗口將打印連接和字符串信息。
          輸入a開頭的字符串將斷開連接。

          FreeBSD平臺

          FreeBSDFreeBSD 6.2下測試通過,其他BSD平臺需要支持kqueue的內核。
          首先轉到src目錄:
          $ cd src
          編譯:
          $ make –f Makefile.freebsd clean all
          這時會在當前目錄生成tcpsrv.0.1.bin的可執行文件,執行:
          $ ./ tcpsrv.0.1.bin
          再打開一個命令行窗口,測試:
          $ telnet 127.0.0.1 1234
          輸入字符串并回車,剛才執行tcpsrv.0.1.bin的窗口將打印連接和字符串信息。
          輸入a開頭的字符串將斷開連接。
           

          使用

          目錄結構:
          src
          |---lance
             |---tcpsrv.hpp 主要接口文件
             |---iocptcpsrv.hpp Windows IOCP網絡服務實現文件
             |---eptcpsrv.hpp Linux epoll網絡服務實現文件
             |---kqtcpsrv.hpp FreeBSD kqueue網絡服務實現文件
          在某種平臺下使用時,src/lance/tcpsrv.hpp必須,其他文件根據平臺而定。
           
          首先,創建一個Client類,這個類必須繼承lance::net::Client,重載事件通知方法。

          // Client對象類,當連接建立時自動創建,當連接斷開時自動銷毀
          class MyClient : public lance::net::Client
          {
                   // 連接建立時被調動
          public: void OnConnect()
                   {
                        printf("OnConnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
                        // 通知調度系統接收數據
                        // 數據這時并沒有真正接收,當客戶端有數據發送來時
                        // 調度器自動接收數據,然后通過OnRecv通知數據接收完成
                        recv(data, 255);
                   }
                   // 連接斷開時被調用
          public: void OnDisconnect()
                   {
                        printf("OnDisconnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
                   }
                   // 當有數據被接收時調用,接收的實際數據長度為len
          public: void OnRecv(int len)
                   {
                        data[len] = 0x00;
                        printf("OnRecv: fd=%08x, data=%s\n", fd, data);
                        // 斷開連接命令
                        if (data[0] == 'a')
                        {
                             printf("user exit command\n");
                             // 通知調度系統斷開連接,當調度系統處理完成后才真正斷開連接
                             close();
                        }
                        // 通知調度系統接收數據
                        // 數據這時并沒有真正接收,當客戶端有數據發送來時
                        // 調度器自動接收數據,然后通過OnRecv通知數據接收完成
                        recv(data, 255);
                   }
          // 數據緩沖區
          public: char data[256];
          };

           
          然后創建一個lance::net::TCPSrv<T>的實例,這個實例負責調度網絡服務。
           
          具體代碼參考src/main.cpplance::net::ClientOnConnectOnRecvOnDisconnect都由工作線程池處理,所以里面可以進行IO操作而不會影響系統響應。

          int main(char * args[])
          {
               lance::net::TCPSrv<MyClient> srv;
               
          // 設置監聽套接字綁定的IP
               
          // 0為綁定所有本機可用IP地址
               srv.ip = 0;
               
          // 監聽端口
               srv.port = 1234;
               
          // 綁定的對象或資源指針
               
          // MyClient里面可以通過srv->ptr獲取這個指針
               srv.ptr = NULL;
               
          // 監聽套接字連接隊列長度
               srv.backlogs = 10;
               
          // 處理線程池線程數
               srv.threads = 1;
               
          // 調度器線程數,通常是本機CPU數的2倍
               
          // 0表示自動選擇
               srv.scheds = 0;
               
          // 啟動網絡服務
               srv.start();
               
          // 循環,保證進程不退出
               while(true)
               {
                   lance::systm::sleep(2000);
               }
               return 0;
          }

           
          Windows平臺的預編譯宏是LANCE_WIN32
          Linux平臺的預編譯宏是LANCE_LINUX
          FreeBSD平臺的預編譯宏是LANCE_FREEBSD
           
          Windows平臺編譯需要使用WIN32_LEAN_AND_MEAN_WIN32_WINNT=0x0500預編譯宏來避免Winsock2Windows頭文件的沖突,否則會產生大量類型重定義錯誤。

          #define EPOLL_MAX_NFDS          10000    // max sockets queried by epoll.
          #define EPOLL_MAX_EVENTS        100      // max events queried by epoll.
          #define EPOLL_MAX_QUEUE         1024     // max events in cache queue.

           
          Linux平臺有額外三個預編譯宏,參考src/lance/eptcpsrv.hpp
          FreeBSD平臺有額外三個預編譯宏,參考src/lance/kqtcpsrv.hpp

          #define KQUEUE_MAX_NFDS 10000 // max sockets queried by kqueue.
          #define KQUEUE_MAX_EVENTS 100 // max events queried by kqueue.
          #define KQUEUE_MAX_QUEUE 1024 // max events in cache queue.

          Windows IOCP設計

          首先用戶接口部分,由兩個類lance::net:TCPSrv<T>lance::net::Client
          lance::net::TCPSrv<T>管理監聽套接字、事件調度和事件處理。
          lance::net::Client管理連接套接字。
          lance::net::TCPSrv<T>lance::net::Listener<T>lance::net::Scheduler<T>lance::net::Processor<T>組成。
          他們之間的關系如下:
          500)this.width=500;" border="0" width="500">
          2
          Listener<T>管理監聽套接字,有單獨的線程執行,當有連接到來時,創建一個Client的對象實例,然后通過IOCP系統調用通知調度器有連接到來,參考src/lance/iocptcpsrv.hpp

          template<typename T>
          void Scheduler<T>::push(T * clt)
          {
               ::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
          }

          Scheduler<T>實際并不做很多事情,只是封裝IOCP句柄,WindowsIOCP功能很豐富,包括管理事件隊列和多CPU支持,所以Scheduler只是一個IOCP的映射。
          Processor<T>管理線程池,這些線程池是工作線程,他們輪詢SchedulerIOCP,從中取出系統事件,IOCP里面有三種事件,一種是客戶端連接事件,一種是客戶端數據事件,最后一種是連接斷開事件,當有事件到來時,會得到Client對象的指針cltClientevent包含了事件類型,參考src/lance/iocptcpsrv.hpp

          template<typename T>
          DWORD WINAPI Processor<T>::run(LPVOID param)
          {
               Processor<T>& procor = *(Processor<T> *)param;
               Scheduler<T>& scheder = *procor.scheder;
               HANDLE iocp = scheder.iocp;

               DWORD ready;
               ULONG_PTR key;
               WSAOVERLAPPED * overlap;
               while (true)
               {
                   ::GetQueuedCompletionStatus(iocp, &ready, &key, (LPOVERLAPPED *)&overlap, INFINITE);

                   T * clt = (T *)key;
                   switch(clt->event)
                   {
                   case T::EV_RECV:
                        {
                             if (0 >= ready)
                             {
                                 clt->event = T::EV_DISCONNECT;
                                 ::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
                             }
                             else
                             {
                                 clt->OnRecv(ready);
                             }
                        }
                        break;
                   case T::EV_CONNECT:
                        {
                             if (NULL == ::CreateIoCompletionPort((HANDLE)clt->fd, iocp, (ULONG_PTR)clt, 0))
                             {
                                 ::closesocket(clt->fd);
                                 delete clt;
                             }
                             else
                             {
                                 clt->OnConnect();
                             }
                        }
                        break;
                   case T::EV_DISCONNECT:
                        {
                             clt->OnDisconnect();
                             ::closesocket(clt->fd);
                             delete clt;
                        }
                        break;
                   case T::EV_SEND:
                        break;
                   }
               }

               return 0;
          }

          所以Client::OnConnectClient::OnRecvClient::OnDisconnect都在工作線程中進行,這些處理過程中都可以有IO等耗時操作,一個連接的阻塞不會影響其他連接的響應速度。
           
          Client的其他方法Client::recvClient::sendClient::close
          Client::recv是一個異步接收數據的方法,這個方面只是告訴IOCP想要接收客戶端的數據,然后立即返回,由IOCP去負責接收數據,有數據收到時,Processor<T>的工作線程會收到Client::EV_RECV的消息,Processor<T>會調用Client::OnRecv進行通知。
          Client::send是發送消息的函數,這個函數是阻塞調用,等待消息發送成功后才返回。
          Client::close是主動斷開客戶端連接的方法,這個方法不會直接調用closesocket(fd),而是調用shutdown(fd)shutdown(fd)會向Scheduler<T>觸發一個Client::EV_DISCONNECT的事件,然后Processor<T>調用Client::OnDisconnect通知連接斷開,執行完Client::OnDisconnect后,由Processor<T>調用closesocket(fd)真正斷開連接,這樣設計一方面滿足任何情況下OnDisconnect都被調用,另一方面因為操作系統會重用已經關閉的套接字fd,所以只有當OnDisconnect執行完畢后才真正調用closesocket讓操作系統回收fd,可以避免使用無效的套接字或者挪用其他連接的套接字。
           

          Linux epollFreeBSD kqueue設計

          Linux epollFreeBSD kqueue的機制幾乎一樣,只有函數名字和個數不一樣,所以一起分析,并且簡寫為Linux
          因為Linux不像Windows一樣會管理事件隊列和多CPU支持,所以Linux需要額外實現事件隊列和多CPU支持。
          Linux下用戶接口跟Windows一樣,有lance::net::TCPSrv<T>lance::net::Client,因為跨平臺,所以他們提供的接口功能和意義也一樣,參考Windows一節。
          lance::net::TCPSrv<T>管理連接套接字、事件隊列、多CPU支持、事件調度和事件處理。
          lance::net::TCPSrv<T>Listener<T>Scheduler<T>Processor<T>Queue<T>組成。
          他們之間關系圖如下:
          500)this.width=500;" border="0" width="500">
          3
          Listener<T>管理監聽套接字,有連接到來時創建一個Client的實例clt,初始化Client::eventClient::EV_CONNECT,然后將clt放入調度器,調度器為clt選擇一個合適的epoll/kqueue進行綁定,然后將clt放入事件隊列Queue<T>等待被Processor<T>執行。
           
          Scheduler<T>管理epoll/kqueue,為了支持多CPU,一個Scheduler<T>可能管理多個epoll/kqueue,通過lance::net::TCPSrv::scheds進行設置,當lance::net::TCPSrv::scheds大于1時,Scheduler<T>將創建scheds個線程,每個線程管理一個epoll/kqueue。當Listener<T>提交一個新的clt時,Scheduler<T>順序選擇一個epoll/kqueue進行綁定,這是最簡單的均等選擇算法,epoll/kqueue會檢查綁定的clt的數據接收和連接斷開事件,如果有事件,會把產生這個事件的clt放入事件隊列Queue<T>等待被Processor<T>執行,并且設置clt的套接字為休眠狀態,因為epoll/kqueue為狀態觸發,如果事件在被Processor<T>處理前不休眠,會再次被觸發,這樣Queue<T>將被迅速填滿。
          CPU時,依靠多個epoll/kqueue能有效利用這些CPU
          參考eptcpsrv.hpp

          template<typename T>
          void Scheduler<T>::push(T * clt)
          {
               clt->epfd = epers[epoff].epfd;
               epoff = (epoff+1 == scheds)?0:(epoff+1);
               queue.in();
               while (queue.full())
               {
                   queue.fullWait();
               }
               if (queue.empty())
               {
                   queue.emptyNotify();
               }
               queue.push(clt);
               queue.out();
          }

           
          Queue<T>是有限緩沖隊列,有隊列最大長度EPOLL_MAX_QUEUE/KQUEUE_MAX_QUEUE,有限緩沖隊列結構如下:

          500)this.width=500;" border="0">

          4
          Queue<T>采用monitor模式,使用pthread_mutex_t lock保護臨界區,使用pthread_cond_t emptySignal做隊列由空到不空的通知,也就是喚醒消費者可以處理隊列,使用pthread_cond_t fullSignal做隊列由滿到不滿的通知,也就是喚醒生產者可以填充隊列,這里Scheduler<T>是生產者,Processor<T>是消費者。
          有時epoll/kqueue會一次產生多個事件,如果先前隊列為空,那么需要通知Processor<T>可以處理事件,因為emptySignal.notify只能一次喚醒一個線程,為了更加高效的處理事件,應該使用emptySignal.broadcast喚醒所有工作線程。
          如果epoll/kqueue一次只產生了一個事件,并且先前隊列為空,那么只需要使用emptySignal.notify喚醒一個工作線程而不應該使用emptySignal.broadcast喚醒工作線程,因為只有一個事件,所以只有一個線程會處理事件,而其他線程會空轉一次消耗資源。
          如果epoll/kqueue產生了事件,但是隊列不為空,那么不需要喚醒工作線程的操作,因為隊列不為空的時候,沒有任何工作線程處于等待狀態。
          代碼參考eptcpsrv.hpp/Queue<T>
           
          Processor<T>Windows基本一樣,Processor<T>Queue<T>取出事件,然后根據clt->event事件類型調用響應的事件通知函數。
           
          Client::recv也是一個請求接收數據的過程,并不實際接收數據,當有數據到來時,Processor<T>的工作線程負責接收數據,然后調用Client::OnRecv通知響應的連接對象。
          Cleint::send是一個同步阻塞函數,等待數據真正發送完成后再返回。
          Client::closeWindows類似,只是調用shutdown來觸發斷開消息,然后處理流程跟Windows一致。




          轉自:http://blog.chinaunix.net/u1/52224/showart_425449.html

          posted on 2009-07-27 22:09 石頭@ 閱讀(1546) 評論(0)  編輯  收藏 所屬分類: Tcp/Ip

          主站蜘蛛池模板: 固镇县| 东光县| 邵阳市| 乐山市| 南川市| 镇坪县| 德清县| 通山县| 慈利县| 鄱阳县| 抚松县| 讷河市| 盐山县| 古浪县| 九寨沟县| 岫岩| 临猗县| 类乌齐县| 伊吾县| 南丹县| 高邮市| 海原县| 芦溪县| 大兴区| 成武县| 丰原市| 嘉鱼县| 马龙县| 吴江市| 浪卡子县| 乌拉特中旗| 中阳县| 固始县| 马公市| 女性| 平谷区| 开鲁县| 客服| 二手房| 叶城县| 宜君县|