海闊天空

          I'm on my way!
          隨筆 - 17, 文章 - 69, 評(píng)論 - 21, 引用 - 0
          數(shù)據(jù)加載中……

          一個(gè)Tcp網(wǎng)絡(luò)服務(wù)框架例子

          WindowsIOCPLinuxepollFreeBSDkqueue寫(xiě)了一個(gè)支持高并發(fā)、多CPU、跨平臺(tái)的TCP網(wǎng)絡(luò)服務(wù)框架。

          測(cè)試

          下載netfrm.v2.rar,解壓縮得到netfrm.v2目錄,里面有netfrm.v2.vcprojsrc目錄。
          測(cè)試代碼在src/main.cpp

          #include <stdio.h>
          #include "./lance/ldebug.h"
          #include "./lance/tcpsrv.hpp"
          #include "./lance/systm.h"
          class MyClient : public lance::net::Client
          {
          public: void OnConnect()
                   {
                        printf("OnConnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);

                        recv(data, 255);
                   }
          public: void OnDisconnect()
                   {
                        printf("OnDisconnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
                   }
          public: void OnRecv(int len)
                   {
                        data[len] = 0x00;
                        printf("OnRecv: fd=%08x, data=%s\n", fd, data);

                        if (data[0] == 'a')
                        {
                             printf("user exit command\n");
                             close();
                        }
                        recv(data, 255);
                   }

          public: char data[256];
          };


          int main(char * args[])
          {
               lance::net::TCPSrv<MyClient> srv;

               srv.ip = 0;
               srv.port = 1234;
               srv.ptr = NULL;
               srv.backlogs = 10;
               srv.threads = 1;
               srv.scheds = 0;
               srv.start();

               while(true)
               {
                   lance::systm::sleep(2000);
               }

               return 0;
          }

          測(cè)試代碼綁定本機(jī)所有IP地址,在1234端口開(kāi)啟網(wǎng)絡(luò)服務(wù),接收客戶(hù)端發(fā)送的字符串,并將這些字符串打印到控制臺(tái)上。

          Windows平臺(tái)

          Windows XP SP2下用vs2003編譯測(cè)試通過(guò)。
          vs2003打開(kāi)netfrm.v2.vcproj,然后編譯、運(yùn)行,會(huì)彈出控制臺(tái)窗口。
          Windows開(kāi)始菜單->運(yùn)行->cmd,啟動(dòng)Windows命令窗口,輸入telnet 127.0.0.1 1234,回車(chē)連接到測(cè)試網(wǎng)絡(luò)服務(wù),如果一切正常,網(wǎng)絡(luò)服務(wù)控制臺(tái)窗口將顯示連接信息,可以在Windows命令窗口隨便輸入信息,這時(shí)網(wǎng)絡(luò)服務(wù)控制臺(tái)窗口將打印輸入的信息。
          如下圖所示:
          500)this.width=500;" border="0" width="500">
          1
          輸入字符a表示斷開(kāi)網(wǎng)絡(luò)連接。

          Linux平臺(tái)

          LinuxRed Hat Enterprise Linux 4下測(cè)試通過(guò),其他Linux平臺(tái)需要Linux 2.6及以上支持epoll的內(nèi)核。
          首先轉(zhuǎn)到src目錄:
          $ cd src
          編譯:
          $ make –f Makefile.linux clean all
          這時(shí)會(huì)在當(dāng)前目錄生成tcpsrv.0.1.bin的可執(zhí)行文件,執(zhí)行:
          $ ./ tcpsrv.0.1.bin
          再打開(kāi)一個(gè)命令行窗口,測(cè)試:
          $ telnet 127.0.0.1 1234
          輸入字符串并回車(chē),剛才執(zhí)行tcpsrv.0.1.bin的窗口將打印連接和字符串信息。
          輸入a開(kāi)頭的字符串將斷開(kāi)連接。

          FreeBSD平臺(tái)

          FreeBSDFreeBSD 6.2下測(cè)試通過(guò),其他BSD平臺(tái)需要支持kqueue的內(nèi)核。
          首先轉(zhuǎn)到src目錄:
          $ cd src
          編譯:
          $ make –f Makefile.freebsd clean all
          這時(shí)會(huì)在當(dāng)前目錄生成tcpsrv.0.1.bin的可執(zhí)行文件,執(zhí)行:
          $ ./ tcpsrv.0.1.bin
          再打開(kāi)一個(gè)命令行窗口,測(cè)試:
          $ telnet 127.0.0.1 1234
          輸入字符串并回車(chē),剛才執(zhí)行tcpsrv.0.1.bin的窗口將打印連接和字符串信息。
          輸入a開(kāi)頭的字符串將斷開(kāi)連接。
           

          使用

          目錄結(jié)構(gòu):
          src
          |---lance
             |---tcpsrv.hpp 主要接口文件
             |---iocptcpsrv.hpp Windows IOCP網(wǎng)絡(luò)服務(wù)實(shí)現(xiàn)文件
             |---eptcpsrv.hpp Linux epoll網(wǎng)絡(luò)服務(wù)實(shí)現(xiàn)文件
             |---kqtcpsrv.hpp FreeBSD kqueue網(wǎng)絡(luò)服務(wù)實(shí)現(xiàn)文件
          在某種平臺(tái)下使用時(shí),src/lance/tcpsrv.hpp必須,其他文件根據(jù)平臺(tái)而定。
           
          首先,創(chuàng)建一個(gè)Client類(lèi),這個(gè)類(lèi)必須繼承lance::net::Client,重載事件通知方法。

          // Client對(duì)象類(lèi),當(dāng)連接建立時(shí)自動(dòng)創(chuàng)建,當(dāng)連接斷開(kāi)時(shí)自動(dòng)銷(xiāo)毀
          class MyClient : public lance::net::Client
          {
                   // 連接建立時(shí)被調(diào)動(dòng)
          public: void OnConnect()
                   {
                        printf("OnConnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
                        // 通知調(diào)度系統(tǒng)接收數(shù)據(jù)
                        // 數(shù)據(jù)這時(shí)并沒(méi)有真正接收,當(dāng)客戶(hù)端有數(shù)據(jù)發(fā)送來(lái)時(shí)
                        // 調(diào)度器自動(dòng)接收數(shù)據(jù),然后通過(guò)OnRecv通知數(shù)據(jù)接收完成
                        recv(data, 255);
                   }
                   // 連接斷開(kāi)時(shí)被調(diào)用
          public: void OnDisconnect()
                   {
                        printf("OnDisconnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
                   }
                   // 當(dāng)有數(shù)據(jù)被接收時(shí)調(diào)用,接收的實(shí)際數(shù)據(jù)長(zhǎng)度為len
          public: void OnRecv(int len)
                   {
                        data[len] = 0x00;
                        printf("OnRecv: fd=%08x, data=%s\n", fd, data);
                        // 斷開(kāi)連接命令
                        if (data[0] == 'a')
                        {
                             printf("user exit command\n");
                             // 通知調(diào)度系統(tǒng)斷開(kāi)連接,當(dāng)調(diào)度系統(tǒng)處理完成后才真正斷開(kāi)連接
                             close();
                        }
                        // 通知調(diào)度系統(tǒng)接收數(shù)據(jù)
                        // 數(shù)據(jù)這時(shí)并沒(méi)有真正接收,當(dāng)客戶(hù)端有數(shù)據(jù)發(fā)送來(lái)時(shí)
                        // 調(diào)度器自動(dòng)接收數(shù)據(jù),然后通過(guò)OnRecv通知數(shù)據(jù)接收完成
                        recv(data, 255);
                   }
          // 數(shù)據(jù)緩沖區(qū)
          public: char data[256];
          };

           
          然后創(chuàng)建一個(gè)lance::net::TCPSrv<T>的實(shí)例,這個(gè)實(shí)例負(fù)責(zé)調(diào)度網(wǎng)絡(luò)服務(wù)。
           
          具體代碼參考src/main.cpplance::net::ClientOnConnectOnRecvOnDisconnect都由工作線程池處理,所以里面可以進(jìn)行IO操作而不會(huì)影響系統(tǒng)響應(yīng)。

          int main(char * args[])
          {
               lance::net::TCPSrv<MyClient> srv;
               
          // 設(shè)置監(jiān)聽(tīng)套接字綁定的IP
               
          // 0為綁定所有本機(jī)可用IP地址
               srv.ip = 0;
               
          // 監(jiān)聽(tīng)端口
               srv.port = 1234;
               
          // 綁定的對(duì)象或資源指針
               
          // MyClient里面可以通過(guò)srv->ptr獲取這個(gè)指針
               srv.ptr = NULL;
               
          // 監(jiān)聽(tīng)套接字連接隊(duì)列長(zhǎng)度
               srv.backlogs = 10;
               
          // 處理線程池線程數(shù)
               srv.threads = 1;
               
          // 調(diào)度器線程數(shù),通常是本機(jī)CPU數(shù)的2倍
               
          // 0表示自動(dòng)選擇
               srv.scheds = 0;
               
          // 啟動(dòng)網(wǎng)絡(luò)服務(wù)
               srv.start();
               
          // 循環(huán),保證進(jìn)程不退出
               while(true)
               {
                   lance::systm::sleep(2000);
               }
               return 0;
          }

           
          Windows平臺(tái)的預(yù)編譯宏是LANCE_WIN32
          Linux平臺(tái)的預(yù)編譯宏是LANCE_LINUX
          FreeBSD平臺(tái)的預(yù)編譯宏是LANCE_FREEBSD
           
          Windows平臺(tái)編譯需要使用WIN32_LEAN_AND_MEAN_WIN32_WINNT=0x0500預(yù)編譯宏來(lái)避免Winsock2Windows頭文件的沖突,否則會(huì)產(chǎn)生大量類(lèi)型重定義錯(cuò)誤。

          #define EPOLL_MAX_NFDS          10000    // max sockets queried by epoll.
          #define EPOLL_MAX_EVENTS        100      // max events queried by epoll.
          #define EPOLL_MAX_QUEUE         1024     // max events in cache queue.

           
          Linux平臺(tái)有額外三個(gè)預(yù)編譯宏,參考src/lance/eptcpsrv.hpp
          FreeBSD平臺(tái)有額外三個(gè)預(yù)編譯宏,參考src/lance/kqtcpsrv.hpp

          #define KQUEUE_MAX_NFDS 10000 // max sockets queried by kqueue.
          #define KQUEUE_MAX_EVENTS 100 // max events queried by kqueue.
          #define KQUEUE_MAX_QUEUE 1024 // max events in cache queue.

          Windows IOCP設(shè)計(jì)

          首先用戶(hù)接口部分,由兩個(gè)類(lèi)lance::net:TCPSrv<T>lance::net::Client
          lance::net::TCPSrv<T>管理監(jiān)聽(tīng)套接字、事件調(diào)度和事件處理。
          lance::net::Client管理連接套接字。
          lance::net::TCPSrv<T>lance::net::Listener<T>lance::net::Scheduler<T>lance::net::Processor<T>組成。
          他們之間的關(guān)系如下:
          500)this.width=500;" border="0" width="500">
          2
          Listener<T>管理監(jiān)聽(tīng)套接字,有單獨(dú)的線程執(zhí)行,當(dāng)有連接到來(lái)時(shí),創(chuàng)建一個(gè)Client的對(duì)象實(shí)例,然后通過(guò)IOCP系統(tǒng)調(diào)用通知調(diào)度器有連接到來(lái),參考src/lance/iocptcpsrv.hpp

          template<typename T>
          void Scheduler<T>::push(T * clt)
          {
               ::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
          }

          Scheduler<T>實(shí)際并不做很多事情,只是封裝IOCP句柄,WindowsIOCP功能很豐富,包括管理事件隊(duì)列和多CPU支持,所以Scheduler只是一個(gè)IOCP的映射。
          Processor<T>管理線程池,這些線程池是工作線程,他們輪詢(xún)SchedulerIOCP,從中取出系統(tǒng)事件,IOCP里面有三種事件,一種是客戶(hù)端連接事件,一種是客戶(hù)端數(shù)據(jù)事件,最后一種是連接斷開(kāi)事件,當(dāng)有事件到來(lái)時(shí),會(huì)得到Client對(duì)象的指針cltClientevent包含了事件類(lèi)型,參考src/lance/iocptcpsrv.hpp

          template<typename T>
          DWORD WINAPI Processor<T>::run(LPVOID param)
          {
               Processor<T>& procor = *(Processor<T> *)param;
               Scheduler<T>& scheder = *procor.scheder;
               HANDLE iocp = scheder.iocp;

               DWORD ready;
               ULONG_PTR key;
               WSAOVERLAPPED * overlap;
               while (true)
               {
                   ::GetQueuedCompletionStatus(iocp, &ready, &key, (LPOVERLAPPED *)&overlap, INFINITE);

                   T * clt = (T *)key;
                   switch(clt->event)
                   {
                   case T::EV_RECV:
                        {
                             if (0 >= ready)
                             {
                                 clt->event = T::EV_DISCONNECT;
                                 ::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
                             }
                             else
                             {
                                 clt->OnRecv(ready);
                             }
                        }
                        break;
                   case T::EV_CONNECT:
                        {
                             if (NULL == ::CreateIoCompletionPort((HANDLE)clt->fd, iocp, (ULONG_PTR)clt, 0))
                             {
                                 ::closesocket(clt->fd);
                                 delete clt;
                             }
                             else
                             {
                                 clt->OnConnect();
                             }
                        }
                        break;
                   case T::EV_DISCONNECT:
                        {
                             clt->OnDisconnect();
                             ::closesocket(clt->fd);
                             delete clt;
                        }
                        break;
                   case T::EV_SEND:
                        break;
                   }
               }

               return 0;
          }

          所以Client::OnConnectClient::OnRecvClient::OnDisconnect都在工作線程中進(jìn)行,這些處理過(guò)程中都可以有IO等耗時(shí)操作,一個(gè)連接的阻塞不會(huì)影響其他連接的響應(yīng)速度。
           
          Client的其他方法Client::recvClient::sendClient::close
          Client::recv是一個(gè)異步接收數(shù)據(jù)的方法,這個(gè)方面只是告訴IOCP想要接收客戶(hù)端的數(shù)據(jù),然后立即返回,由IOCP去負(fù)責(zé)接收數(shù)據(jù),有數(shù)據(jù)收到時(shí),Processor<T>的工作線程會(huì)收到Client::EV_RECV的消息,Processor<T>會(huì)調(diào)用Client::OnRecv進(jìn)行通知。
          Client::send是發(fā)送消息的函數(shù),這個(gè)函數(shù)是阻塞調(diào)用,等待消息發(fā)送成功后才返回。
          Client::close是主動(dòng)斷開(kāi)客戶(hù)端連接的方法,這個(gè)方法不會(huì)直接調(diào)用closesocket(fd),而是調(diào)用shutdown(fd)shutdown(fd)會(huì)向Scheduler<T>觸發(fā)一個(gè)Client::EV_DISCONNECT的事件,然后Processor<T>調(diào)用Client::OnDisconnect通知連接斷開(kāi),執(zhí)行完Client::OnDisconnect后,由Processor<T>調(diào)用closesocket(fd)真正斷開(kāi)連接,這樣設(shè)計(jì)一方面滿足任何情況下OnDisconnect都被調(diào)用,另一方面因?yàn)椴僮飨到y(tǒng)會(huì)重用已經(jīng)關(guān)閉的套接字fd,所以只有當(dāng)OnDisconnect執(zhí)行完畢后才真正調(diào)用closesocket讓操作系統(tǒng)回收fd,可以避免使用無(wú)效的套接字或者挪用其他連接的套接字。
           

          Linux epollFreeBSD kqueue設(shè)計(jì)

          Linux epollFreeBSD kqueue的機(jī)制幾乎一樣,只有函數(shù)名字和個(gè)數(shù)不一樣,所以一起分析,并且簡(jiǎn)寫(xiě)為Linux
          因?yàn)?font face="Times New Roman">Linux不像Windows一樣會(huì)管理事件隊(duì)列和多CPU支持,所以Linux需要額外實(shí)現(xiàn)事件隊(duì)列和多CPU支持。
          Linux下用戶(hù)接口跟Windows一樣,有lance::net::TCPSrv<T>lance::net::Client,因?yàn)榭缙脚_(tái),所以他們提供的接口功能和意義也一樣,參考Windows一節(jié)。
          lance::net::TCPSrv<T>管理連接套接字、事件隊(duì)列、多CPU支持、事件調(diào)度和事件處理。
          lance::net::TCPSrv<T>Listener<T>Scheduler<T>Processor<T>Queue<T>組成。
          他們之間關(guān)系圖如下:
          500)this.width=500;" border="0" width="500">
          3
          Listener<T>管理監(jiān)聽(tīng)套接字,有連接到來(lái)時(shí)創(chuàng)建一個(gè)Client的實(shí)例clt,初始化Client::eventClient::EV_CONNECT,然后將clt放入調(diào)度器,調(diào)度器為clt選擇一個(gè)合適的epoll/kqueue進(jìn)行綁定,然后將clt放入事件隊(duì)列Queue<T>等待被Processor<T>執(zhí)行。
           
          Scheduler<T>管理epoll/kqueue,為了支持多CPU,一個(gè)Scheduler<T>可能管理多個(gè)epoll/kqueue,通過(guò)lance::net::TCPSrv::scheds進(jìn)行設(shè)置,當(dāng)lance::net::TCPSrv::scheds大于1時(shí),Scheduler<T>將創(chuàng)建scheds個(gè)線程,每個(gè)線程管理一個(gè)epoll/kqueue。當(dāng)Listener<T>提交一個(gè)新的clt時(shí),Scheduler<T>順序選擇一個(gè)epoll/kqueue進(jìn)行綁定,這是最簡(jiǎn)單的均等選擇算法,epoll/kqueue會(huì)檢查綁定的clt的數(shù)據(jù)接收和連接斷開(kāi)事件,如果有事件,會(huì)把產(chǎn)生這個(gè)事件的clt放入事件隊(duì)列Queue<T>等待被Processor<T>執(zhí)行,并且設(shè)置clt的套接字為休眠狀態(tài),因?yàn)?font face="Times New Roman">epoll/kqueue為狀態(tài)觸發(fā),如果事件在被Processor<T>處理前不休眠,會(huì)再次被觸發(fā),這樣Queue<T>將被迅速填滿。
          CPU時(shí),依靠多個(gè)epoll/kqueue能有效利用這些CPU
          參考eptcpsrv.hpp

          template<typename T>
          void Scheduler<T>::push(T * clt)
          {
               clt->epfd = epers[epoff].epfd;
               epoff = (epoff+1 == scheds)?0:(epoff+1);
               queue.in();
               while (queue.full())
               {
                   queue.fullWait();
               }
               if (queue.empty())
               {
                   queue.emptyNotify();
               }
               queue.push(clt);
               queue.out();
          }

           
          Queue<T>是有限緩沖隊(duì)列,有隊(duì)列最大長(zhǎng)度EPOLL_MAX_QUEUE/KQUEUE_MAX_QUEUE,有限緩沖隊(duì)列結(jié)構(gòu)如下:

          500)this.width=500;" border="0">

          4
          Queue<T>采用monitor模式,使用pthread_mutex_t lock保護(hù)臨界區(qū),使用pthread_cond_t emptySignal做隊(duì)列由空到不空的通知,也就是喚醒消費(fèi)者可以處理隊(duì)列,使用pthread_cond_t fullSignal做隊(duì)列由滿到不滿的通知,也就是喚醒生產(chǎn)者可以填充隊(duì)列,這里Scheduler<T>是生產(chǎn)者,Processor<T>是消費(fèi)者。
          有時(shí)epoll/kqueue會(huì)一次產(chǎn)生多個(gè)事件,如果先前隊(duì)列為空,那么需要通知Processor<T>可以處理事件,因?yàn)?font face="Times New Roman">emptySignal.notify只能一次喚醒一個(gè)線程,為了更加高效的處理事件,應(yīng)該使用emptySignal.broadcast喚醒所有工作線程。
          如果epoll/kqueue一次只產(chǎn)生了一個(gè)事件,并且先前隊(duì)列為空,那么只需要使用emptySignal.notify喚醒一個(gè)工作線程而不應(yīng)該使用emptySignal.broadcast喚醒工作線程,因?yàn)橹挥幸粋€(gè)事件,所以只有一個(gè)線程會(huì)處理事件,而其他線程會(huì)空轉(zhuǎn)一次消耗資源。
          如果epoll/kqueue產(chǎn)生了事件,但是隊(duì)列不為空,那么不需要喚醒工作線程的操作,因?yàn)殛?duì)列不為空的時(shí)候,沒(méi)有任何工作線程處于等待狀態(tài)。
          代碼參考eptcpsrv.hpp/Queue<T>
           
          Processor<T>Windows基本一樣,Processor<T>Queue<T>取出事件,然后根據(jù)clt->event事件類(lèi)型調(diào)用響應(yīng)的事件通知函數(shù)。
           
          Client::recv也是一個(gè)請(qǐng)求接收數(shù)據(jù)的過(guò)程,并不實(shí)際接收數(shù)據(jù),當(dāng)有數(shù)據(jù)到來(lái)時(shí),Processor<T>的工作線程負(fù)責(zé)接收數(shù)據(jù),然后調(diào)用Client::OnRecv通知響應(yīng)的連接對(duì)象。
          Cleint::send是一個(gè)同步阻塞函數(shù),等待數(shù)據(jù)真正發(fā)送完成后再返回。
          Client::closeWindows類(lèi)似,只是調(diào)用shutdown來(lái)觸發(fā)斷開(kāi)消息,然后處理流程跟Windows一致。




          轉(zhuǎn)自:http://blog.chinaunix.net/u1/52224/showart_425449.html

          posted on 2009-07-27 22:09 石頭@ 閱讀(1546) 評(píng)論(0)  編輯  收藏 所屬分類(lèi): Tcp/Ip

          主站蜘蛛池模板: 榆树市| 错那县| 申扎县| 镇康县| 桐柏县| 抚宁县| 梧州市| 威远县| 乳山市| 肇州县| 纳雍县| 栖霞市| 鸡泽县| 拜泉县| 莱西市| 万盛区| 琼结县| 湄潭县| 海原县| 宾阳县| 南开区| 塔城市| 洛南县| 额尔古纳市| 新野县| 奉贤区| 庄河市| 霍林郭勒市| 确山县| 南京市| 仁寿县| 墨江| 邵阳县| 乌拉特中旗| 大宁县| 横峰县| 梧州市| 清水县| 龙海市| 莲花县| 云霄县|