隨筆-7  評(píng)論-23  文章-0  trackbacks-0
          Netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開(kāi)發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶(hù)端程序[官方定義],整體來(lái)看其包含了以下內(nèi)容:1.提供了豐富的協(xié)議編解碼支持,2.實(shí)現(xiàn)自有的buffer系統(tǒng),減少?gòu)?fù)制所帶來(lái)的消耗,3.整套channel的實(shí)現(xiàn),4.基于事件的過(guò)程流轉(zhuǎn)以及完整的網(wǎng)絡(luò)事件響應(yīng)與擴(kuò)展,5.豐富的example。本文并不對(duì)Netty實(shí)際使用中可能出現(xiàn)的問(wèn)題做分析,只是從代碼角度分析它的架構(gòu)以及實(shí)現(xiàn)上的一些關(guān)鍵細(xì)節(jié)。

          首先來(lái)看下最如何使用Netty(其自帶example很好展示了使用),Netty普通使用一般是通過(guò)BootStrap來(lái)啟動(dòng),BootStrap主要分為兩類(lèi):1.面向連接(TCP)的BootStrap(ClientBootStrap和ServerBootstrap),2.非面向連接(UDP)的(ConnectionlessBootstrap)。

          Netty
          整體架構(gòu)很清晰的分成2個(gè)部分,ChannelFactory和ChannelPipelineFactory,前者主要生產(chǎn)網(wǎng)絡(luò)通信相關(guān)的Channel實(shí)例和ChannelSink實(shí)例,Netty提供的ChannelFactory實(shí)現(xiàn)基本能夠滿(mǎn)足絕大部分用戶(hù)的需求,當(dāng)然你也可以定制自己的ChannelFactory,后者主要關(guān)注于具體傳輸數(shù)據(jù)的處理,同時(shí)也包括其他方面的內(nèi)容,比如異常處理等等,只要是你希望的,你都可以往里添加相應(yīng)的handler,一般ChannelPipelineFactory由用戶(hù)自己實(shí)現(xiàn),因?yàn)閭鬏敂?shù)據(jù)的處理及其他操作和業(yè)務(wù)關(guān)聯(lián)比較緊密,需要自定義處理的handler。

          現(xiàn)在,使用Netty的步驟實(shí)際上已經(jīng)非常明確了,比如面向連接的Netty服務(wù)端客戶(hù)端使用,第一步:實(shí)例化一個(gè)BootStrap,并且通過(guò)構(gòu)造方法指定一個(gè)ChannelFactory實(shí)現(xiàn),第二步:向bootstrap實(shí)例注冊(cè)一個(gè)自己實(shí)現(xiàn)的ChannelPipelineFactory,第三步:如果是服務(wù)器端,bootstrap.bind(new InetSocketAddress(port)),然后等待客戶(hù)端來(lái)連接,如果是客戶(hù)端,bootstrap.connect(new InetSocketAddress(host,port))取得一個(gè)future,這個(gè)時(shí)候Netty會(huì)去連接遠(yuǎn)程主機(jī),在連接完成后,會(huì)發(fā)起類(lèi)型為CONNECTED的ChannelStateEvent,并且開(kāi)始在你自定義的Pipeline里面流轉(zhuǎn),如果你注冊(cè)的handler有這個(gè)事件的響應(yīng)方法的話(huà)那么就會(huì)調(diào)用到這個(gè)方法。在此之后就是數(shù)據(jù)的傳輸了。下面是一個(gè)簡(jiǎn)單客戶(hù)端的代碼解讀。

          // 實(shí)例化一個(gè)客戶(hù)端Bootstrap實(shí)例,其中NioClientSocketChannelFactory實(shí)例由Netty提供
                  ClientBootstrap bootstrap = new ClientBootstrap(
                          
          new NioClientSocketChannelFactory(
                                  Executors.newCachedThreadPool(),
                                  Executors.newCachedThreadPool()));

                  
          // 設(shè)置PipelineFactory,由客戶(hù)端自己實(shí)現(xiàn)
                  bootstrap.setPipelineFactory(new FactorialClientPipelineFactory(count));

                  
          //向目標(biāo)地址發(fā)起一個(gè)連接
                  ChannelFuture connectFuture =
                      bootstrap.connect(
          new InetSocketAddress(host, port));

                  
          // 等待鏈接成功,成功后發(fā)起的connected事件將會(huì)使handler開(kāi)始發(fā)送信息并且等待messageRecive,當(dāng)然這只是示例。
                  Channel channel = connectFuture.awaitUninterruptibly().getChannel();

                  
          // 得到用戶(hù)自定義的handler
                  FactorialClientHandler handler =
                      (FactorialClientHandler) channel.getPipeline().getLast();

                  
          // 從handler里面取數(shù)據(jù)并且打印,這里需要注意的是,handler.getFactorial使用了從結(jié)果隊(duì)列result take數(shù)據(jù)的阻塞方法,而結(jié)果隊(duì)列會(huì)在messageRecieve事件發(fā)生時(shí)被填充接收回來(lái)的數(shù)據(jù)
                  System.err.format(
                          
          "Factorial of %,d is: %,d", count, handler.getFactorial());

          Netty提供了NIO與BIO(OIO)兩種模式處理這些邏輯,其中NIO主要通過(guò)一個(gè)BOSS線(xiàn)程處理等待鏈接的接入,若干個(gè)WORKER線(xiàn)程(從worker線(xiàn)程池中挑選一個(gè)賦給Channel實(shí)例,因?yàn)镃hannel實(shí)例持有真正的java網(wǎng)絡(luò)對(duì)象)接過(guò)BOSS線(xiàn)程遞交過(guò)來(lái)的CHANNEL進(jìn)行數(shù)據(jù)讀寫(xiě)并且觸發(fā)相應(yīng)事件傳遞給pipeline進(jìn)行數(shù)據(jù)處理,而B(niǎo)IO(OIO)方式服務(wù)器端雖然還是通過(guò)一個(gè)BOSS線(xiàn)程來(lái)處理等待鏈接的接入,但是客戶(hù)端是由主線(xiàn)程直接connect,另外寫(xiě)數(shù)據(jù)C/S兩端都是直接主線(xiàn)程寫(xiě),而數(shù)據(jù)讀操作是通過(guò)一個(gè)WORKER 線(xiàn)程BLOCK方式讀取(一直等待,直到讀到數(shù)據(jù),除非channel關(guān)閉)。

          網(wǎng)絡(luò)動(dòng)作歸結(jié)到最簡(jiǎn)單就是服務(wù)器端bind->accept->read->write,客戶(hù)端connect->read->write,一般bind或者connect后會(huì)有多次read、write。這種特性導(dǎo)致,bind,accept與read,write的線(xiàn)程分離,connect與read、write線(xiàn)程分離,這樣做的好處就是無(wú)論是服務(wù)器端還是客戶(hù)端吞吐量將有效增大,以便充分利用機(jī)器的處理能力,而不是卡在網(wǎng)絡(luò)連接上,不過(guò)一旦機(jī)器處理能力充分利用后,這種方式反而可能會(huì)因?yàn)檫^(guò)于頻繁的線(xiàn)程切換導(dǎo)致性能損失而得不償失,并且這種處理模型復(fù)雜度比較高。


          采用什么樣的網(wǎng)絡(luò)事件響應(yīng)處理機(jī)制對(duì)于網(wǎng)絡(luò)吞吐量是非常重要的,Netty采用的是標(biāo)準(zhǔn)的SEDA(Staged Event-Driven Architecture)架構(gòu)[http://en.wikipedia.org/wiki/ Staged_event-driven_architecture],其所設(shè)計(jì)的事件類(lèi)型,代表了網(wǎng)絡(luò)交互的各個(gè)階段,并且在每個(gè)階段發(fā)生時(shí),觸發(fā)相應(yīng)事件交給初始化時(shí)生成的pipeline實(shí)例進(jìn)行處理。事件處理都是通過(guò)Channels類(lèi)的靜態(tài)方法調(diào)用開(kāi)始的,將事件、channel傳遞給channel持有的Pipeline進(jìn)行處理,Channels類(lèi)幾乎所有方法都為靜態(tài),提供一種Proxy的效果(整個(gè)工程里無(wú)論何時(shí)何地都可以調(diào)用其靜態(tài)方法觸發(fā)固定的事件流轉(zhuǎn),但其本身并不關(guān)注具體的處理流程)。

          Channels
          部分事件流轉(zhuǎn)靜態(tài)方法
          1.fireChannelOpen  2.fireChannelBound  3.fireChannelConnected  4.fireMessageReceived  5.fireWriteComplete 6.fireChannelInterestChanged
          7.fireChannelDisconnected 8.fireChannelUnbound 9.fireChannelClosed 10.fireExceptionCaught 11.fireChildChannelStateChanged

          Netty提供了全面而又豐富的網(wǎng)絡(luò)事件類(lèi)型,其將java中的網(wǎng)絡(luò)事件分為了兩種類(lèi)型Upstream和Downstream。一般來(lái)說(shuō),Upstream類(lèi)型的事件主要是由網(wǎng)絡(luò)底層反饋給Netty的,比如messageReceived,channelConnected等事件,而Downstream類(lèi)型的事件是由框架自己發(fā)起的,比如bind,write,connect,close等事件。



          NettyUpstreamDownstream網(wǎng)絡(luò)事件類(lèi)型特性也使一個(gè)Handler分為了3種類(lèi)型,專(zhuān)門(mén)處理Upstream,專(zhuān)門(mén)處理Downstream,同時(shí)處理Upstream,Downstream。實(shí)現(xiàn)方式是某個(gè)具體Handler通過(guò)繼承ChannelUpstreamHandlerChannelDownstreamHandler類(lèi)來(lái)進(jìn)行區(qū)分。PipeLineDownstream或者Upstream類(lèi)型的網(wǎng)絡(luò)事件發(fā)生時(shí),會(huì)調(diào)用匹配事件類(lèi)型的Handler響應(yīng)這種調(diào)用。ChannelPipeline維持有所有handler有序鏈表,并且由handler自身控制是否繼續(xù)流轉(zhuǎn)到下一個(gè)handler(ctx.sendDownstream(e),這樣設(shè)計(jì)有個(gè)好處就是隨時(shí)終止流轉(zhuǎn),業(yè)務(wù)目的達(dá)到無(wú)需繼續(xù)流轉(zhuǎn)到下一個(gè)handler)。下面的代碼是取得下一個(gè)處理Downstream事件的處理器。

          1DefaultChannelHandlerContext realCtx = ctx;
          2while (!realCtx.canHandleUpstream()) {
          3    realCtx = realCtx.next;
          4    if (realCtx == null{
          5        return null;
          6    }

          7}

          8
          9return realCtx;

          如果是一個(gè)網(wǎng)絡(luò)會(huì)話(huà)最末端的事件,比如messageRecieve,那么可能在某個(gè)handler里面就直接結(jié)束整個(gè)會(huì)話(huà),并把數(shù)據(jù)交給上層應(yīng)用,但是如果是網(wǎng)絡(luò)會(huì)話(huà)的中途事件,比如connect事件,那么當(dāng)觸發(fā)connect事件時(shí),經(jīng)過(guò)pipeline流轉(zhuǎn),最終會(huì)到達(dá)掛載pipeline最底下的ChannelSink實(shí)例中,這類(lèi)實(shí)例主要作用就是發(fā)送請(qǐng)求和接收請(qǐng)求,以及數(shù)據(jù)的讀寫(xiě)操作。



          NIO方式ChannelSink一般會(huì)有1個(gè)BOSS實(shí)例(implements Runnable),以及若干個(gè)worker實(shí)例(不設(shè)置默認(rèn)為cpu cores*2個(gè)worker),這在前面已經(jīng)提起過(guò),BOSS線(xiàn)程在客戶(hù)端類(lèi)型的ChannelSink和服務(wù)器端類(lèi)型的ChannelSink觸發(fā)條件不一樣,客戶(hù)端類(lèi)型的BOSS線(xiàn)程是在發(fā)生connect事件時(shí)啟動(dòng),主要監(jiān)聽(tīng)connect是否成功,如果成功,將啟動(dòng)一個(gè)worker線(xiàn)程,connectedchannel交給這個(gè)線(xiàn)程繼續(xù)下面的工作,而服務(wù)器端的BOSS線(xiàn)程是發(fā)生在bind事件時(shí)啟動(dòng),它的工作也相對(duì)比較簡(jiǎn)單,對(duì)于channel.socket().accept()進(jìn)來(lái)的請(qǐng)求向Nioworker進(jìn)行工作分配即可。這里需要提到的是,Server端ChannelSink實(shí)現(xiàn)比較特別,無(wú)論是NioServerSocketPipelineSink還是OioServerSocketPipelineSink的eventSunk方法實(shí)現(xiàn)都將channel分為ServerSocketChannel和SocketChannel分開(kāi)處理。這主要原因是Boss線(xiàn)程accept()一個(gè)新的連接生成一個(gè)SocketChannel交給Worker進(jìn)行數(shù)據(jù)接收。

          1   public void eventSunk(
          2            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
          3        Channel channel = e.getChannel();
          4        if (channel instanceof NioServerSocketChannel) {
          5            handleServerSocket(e);
          6        }
           else if (channel instanceof NioSocketChannel) {
          7            handleAcceptedSocket(e);
          8        }

          9    }

          1 NioWorker worker = nextWorker();
          2                worker.register(new NioAcceptedSocketChannel(
          3                        channel.getFactory(), pipeline, channel,
          4                        NioServerSocketPipelineSink.this, acceptedSocket,
          5                        worker, currentThread), null);

          另外兩者實(shí)例化時(shí)都會(huì)走一遍如下流程:

          1  setConnected();
          2        fireChannelOpen(this);
          3        fireChannelBound(this, getLocalAddress());
          4        fireChannelConnected(this, getRemoteAddress());

          而對(duì)應(yīng)的ChannelSink里面的處理代碼就不同于ServerSocketChannel了,因?yàn)樽叩氖莌andleAcceptedSocket(e)這一塊代碼,從默認(rèn)實(shí)現(xiàn)代碼來(lái)說(shuō),實(shí)例化調(diào)用fireChannelOpen(this);fireChannelBound(this,getLocalAddress());fireChannelConnected(this,getRemoteAddress())沒(méi)有什么意義,但是對(duì)于自己實(shí)現(xiàn)的ChannelSink有著特殊意義。具體的用途我沒(méi)去了解,但是可以讓用戶(hù)插手Server accept連接到準(zhǔn)備讀寫(xiě)數(shù)據(jù)這一個(gè)過(guò)程的處理。

           1  switch (state) {
           2            case OPEN:
           3                if (Boolean.FALSE.equals(value)) {
           4                    channel.worker.close(channel, future);
           5                }

           6                break;
           7            case BOUND:
           8            case CONNECTED:
           9                if (value == null{
          10                    channel.worker.close(channel, future);
          11                }

          12                break;
          13            case INTEREST_OPS:
          14                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
          15                break;
          16            }


          Netty提供了大量的handler來(lái)處理網(wǎng)絡(luò)數(shù)據(jù),但是大部分是CODEC相關(guān)的,以便支持多種協(xié)議,下面一個(gè)圖繪制了現(xiàn)階段Netty提供的Handlers(紅色部分不完全)



          Netty實(shí)現(xiàn)封裝實(shí)現(xiàn)了自己的一套ByteBuffer系統(tǒng),這個(gè)ByteBuffer系統(tǒng)對(duì)外統(tǒng)一的接口就是ChannelBuffer,這個(gè)接口從整體上來(lái)說(shuō)定義了兩類(lèi)方法,一種是類(lèi)似getXXX(int index…)setXXX(int index…)需要指定開(kāi)始操作buffer的起始位置,簡(jiǎn)單點(diǎn)來(lái)說(shuō)就是直接操作底層buffer,并不用到Netty特有的高可重用性buffer特性,所以Netty內(nèi)部對(duì)于這類(lèi)方法調(diào)用非常少,另外一種是類(lèi)似readXXX(),writeXXX()不需要指定位置的buffer操作,這類(lèi)方法實(shí)現(xiàn)放在了AbstractChannelBuffer,其主要的特性就是維持buffer的位置信息,包括readerIndex,writerIndex,以及回溯作用的markedReaderIndexmarkedWriterIndex,當(dāng)用戶(hù)調(diào)用readXXX()或者writeXXX()方法時(shí),AbstractChannelBuffer會(huì)根據(jù)維護(hù)的readerIndex,writerIndex計(jì)算出讀取位置,然后調(diào)用繼承自己的ChannelBuffergetXXX(int index…)或者setXXX(int index…)方法返回結(jié)果,這類(lèi)方法在Netty內(nèi)部被大量調(diào)用,因?yàn)檫@個(gè)特性最大的好處就是很方便地重用buffer而不必去費(fèi)心費(fèi)力維護(hù)index或者新建大量的ByteBuffer

          另外WrappedChannelBuffer接口提供的是對(duì)ChannelBuffer的代理,他的用途說(shuō)白了就是重用底層buffer,但是會(huì)轉(zhuǎn)換一些buffer的角色,比如原本是讀寫(xiě)皆可 wrapReadOnlyChannelBuffer,那么整個(gè)buffer只能使用readXXX()或者getXXX()方法,也就是只讀,然后底層的buffer還是原來(lái)那個(gè),再如一個(gè)已經(jīng)進(jìn)行過(guò)讀寫(xiě)的ChannelBufferwrapTruncatedChannelBuffer,那么新的buffer將會(huì)忽略掉被wrapbuffer內(nèi)數(shù)據(jù),并且可以指定新的writeIndex,相當(dāng)于slice功能。




          Netty實(shí)現(xiàn)了自己的一套完整Channel系統(tǒng),這個(gè)channel說(shuō)實(shí)在也是對(duì)java 網(wǎng)絡(luò)做了一層封裝,加上了SEDA特性(基于事件響應(yīng),異步,多線(xiàn)程等)。其最終的網(wǎng)絡(luò)通信還是依靠底下的java網(wǎng)絡(luò)api。提到異步,不得不提到NettyFuture系統(tǒng),從channel的定義來(lái)說(shuō),write,bind,connect,disconnect,unbind,close,甚至包括setInterestOps等方法都會(huì)返回一個(gè)channelFuture,這這些方法調(diào)用都會(huì)觸發(fā)相關(guān)網(wǎng)絡(luò)事件,并且在pipeline中流轉(zhuǎn)。Channel很多方法調(diào)用基本上不會(huì)馬上就執(zhí)行到最底層,而是觸發(fā)事件,在pipeline中走一圈,最后才在channelsink中執(zhí)行相關(guān)操作,如果涉及網(wǎng)絡(luò)操作,那么最終調(diào)用會(huì)回到Channel中,也就是serversocketchannel,socketchannel,serversocket,socketjava原生網(wǎng)絡(luò)api的調(diào)用,而這些實(shí)例就是jboss實(shí)現(xiàn)的channel所持有的(部分channel)



           

          Netty新版本出現(xiàn)了一個(gè)特性zero-copy,這個(gè)機(jī)制可以使文件內(nèi)容直接傳輸?shù)较鄳?yīng)channel上而不需要通過(guò)cpu參與,也就少了一次內(nèi)存復(fù)制。Netty內(nèi)部ChunkedFile FileRegion 構(gòu)成了non zero-copy zero-copy兩種形式的文件內(nèi)容傳輸機(jī)制,前者需要CPU參與,后者根據(jù)操作系統(tǒng)是否支持zero-copy將文件數(shù)據(jù)傳輸?shù)教囟?/span>channel,如果操作系統(tǒng)支持,不需要cpu參與,從而少了一次內(nèi)存復(fù)制。ChunkedFile主要使用fileread,readFullyAPI,而FileRegion使用FileChanneltransferTo API2者實(shí)現(xiàn)并不復(fù)雜。Zero-copy的特性還是得看操作系統(tǒng)的,本身代碼沒(méi)有很大的特別之處。

          最后總結(jié)下,Netty的架構(gòu)思想和細(xì)節(jié)可以說(shuō)讓人眼前一亮,對(duì)于java網(wǎng)絡(luò)IO的各個(gè)注意點(diǎn),可以說(shuō)Netty已經(jīng)解決得比較完全了,同時(shí)Netty的作者也是另外一個(gè)NIO框架MINA的作者,在實(shí)際使用中積累了豐富的經(jīng)驗(yàn),但是本文也只是一個(gè)新手對(duì)于Netty的初步理解,還沒(méi)有足夠的能力指出某一細(xì)節(jié)的所發(fā)揮的作用。

          posted on 2010-09-25 12:10 BucketLI 閱讀(3517) 評(píng)論(1)  編輯  收藏

          評(píng)論:
          # re: Netty代碼分析 2010-12-28 16:34 | simaliu
          很不錯(cuò)的文章,可能這里有一個(gè)小筆誤“客戶(hù)端connect->read->write”,應(yīng)該是“客戶(hù)端connect->write->read”吧?  回復(fù)  更多評(píng)論
            

          只有注冊(cè)用戶(hù)登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          <2010年9月>
          2930311234
          567891011
          12131415161718
          19202122232425
          262728293012
          3456789

          常用鏈接

          留言簿(1)

          隨筆檔案

          搜索

          •  

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          主站蜘蛛池模板: 鄂托克旗| 海原县| 通辽市| 繁峙县| 麻阳| 洛宁县| 德州市| 图们市| 台江县| 巨鹿县| 永春县| 富顺县| 宜州市| 治多县| 交口县| 白山市| 长沙县| 杨浦区| 丽水市| 德惠市| 万荣县| 曲水县| 哈密市| 叶城县| 梧州市| 孟津县| 东城区| 虞城县| 徐水县| 临桂县| 扎鲁特旗| 托里县| 忻城县| 高碑店市| 宜宾市| 疏勒县| 广宗县| 昌黎县| 海宁市| 阿城市| 东海县|