Chan Chen Coding...

          Netty 4.0 源碼分析(三):Channel和ChannelPipeline

          Clientserver通過Channel連接,然后通過ByteBuf進(jìn)行傳輸。每個Channel有自己的PipelinePipeline上面可以添加和定義HandlerEvent

           

          Channel

           1 package io.netty.channel;
           2
           import io.netty.buffer.ByteBuf;
           3 import io.netty.buffer.MessageBuf;
           4 import io.netty.channel.socket.DatagramChannel;
           5 import io.netty.channel.socket.ServerSocketChannel;
           6 import io.netty.channel.socket.SocketChannel;
           7 import io.netty.util.AttributeMap;
           8 import java.net.InetSocketAddress;
           9 import java.net.SocketAddress;
          10 import java.nio.channels.SelectionKey;
          11 public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFutureFactory, Comparable<Channel> {
          12     Integer id();
          13     EventLoop eventLoop();
          14     Channel parent();
          15     ChannelConfig config();
          16     ChannelPipeline pipeline();
          17     boolean isOpen();
          18     boolean isRegistered();
          19     boolean isActive();
          20     ChannelMetadata metadata();
          21     ByteBuf outboundByteBuffer();
          22     <T> MessageBuf<T> outboundMessageBuffer();
          23     SocketAddress localAddress();
          24     SocketAddress remoteAddress();
          25     ChannelFuture closeFuture();
          26     Unsafe unsafe();
          27     interface Unsafe {
          28         ChannelHandlerContext directOutboundContext();
          29         ChannelFuture voidFuture();
          30         SocketAddress localAddress();
          31         SocketAddress remoteAddress();
          32         void register(EventLoop eventLoop, ChannelFuture future);
          33         void bind(SocketAddress localAddress, ChannelFuture future);
          34         void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
          35         void disconnect(ChannelFuture future);
          36         void close(ChannelFuture future);
          37         void closeForcibly();
          38         void deregister(ChannelFuture future);
          39         void flush(ChannelFuture future);
          40         void flushNow();
          41         void suspendRead();
          42         void resumeRead();
          43     }
          44 }
          45 


          ChannelUML


          Netty 4.0
          中,定義了Channel接口,這個接口用于連接網(wǎng)絡(luò)的socket傳輸,或者具有I/O操作的組件連接。這里的I/O操作有,read,write,bind,connect

           

          Channel接口為用戶提供了:

          1.   Channel的當(dāng)前狀態(tài),比如:Channel是否open,或者Channel是否已經(jīng)連接。

          2.   Channel的參數(shù),比如:接受的buffer大小。

          3.   Channel支持的I/O操作,比如:read,write,connect,bind

          4.   注冊在Channel上的ChannelPipelineChannelPipeline用于處理所有的I/O事件和請求。

           

          Channel類的幾個重要方法

          ChannelFuture closeFuture();

          所有在Netty中的I/O操作都是異步的,這就意味著任何的I/O調(diào)用都會立即返回,但是無法保證所有被調(diào)用的I/O操作到最后能夠成功執(zhí)行完成。closeFuture() 返回一個ChannelFuture對象, 并且告訴I/O的調(diào)用者,這個I/O調(diào)用的最后狀態(tài)是succeeded,failed 或者 canceled

           

          void register(EventLoop eventLoop, ChannelFuture future);

          Channel中注冊EventLoop和對應(yīng)的ChannelFuture

           

          void deregister(ChannelFuture future);

          Channel中取消ChannelFuture的注冊。

           

           

          Channel的層次結(jié)構(gòu)中,Channel子類的實(shí)現(xiàn)取決于傳輸?shù)木唧w實(shí)現(xiàn)。比如SocketChannel,能夠被ServerSocketChannel接受,并且SocketChannel中的getParent()方法會返回ServerSocketChannel。開發(fā)者可以實(shí)現(xiàn)Channel接口,共享Socket連接,比如SSH

           

          ChannelPipeLine接口

           1 public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker {
           2     MessageBuf<Object> inboundMessageBuffer();
           3     ByteBuf inboundByteBuffer();
           4     MessageBuf<Object> outboundMessageBuffer();
           5     ByteBuf outboundByteBuffer();
           6     ChannelPipeline addFirst(String name, ChannelHandler handler);
           7     ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
           8     ChannelPipeline addLast(String name, ChannelHandler handler);
           9     ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
          10     ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
          11     ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
          12     ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
          13     ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
          14     ChannelPipeline addFirst(ChannelHandler handlers);
          15     ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler handlers);
          16     ChannelPipeline addLast(ChannelHandler handlers);
          17     ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler handlers);
          18     void remove(ChannelHandler handler);
          19     ChannelHandler remove(String name);
          20     <T extends ChannelHandler> T remove(Class<T> handlerType);
          21     ChannelHandler removeFirst();
          22     ChannelHandler removeLast();
          23     void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
          24     ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
          25     <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler);
          26     ChannelHandler first();
          27     ChannelHandlerContext firstContext();
          28     ChannelHandler last();
          29     ChannelHandlerContext lastContext();
          30     ChannelHandler get(String name);
          31     <T extends ChannelHandler> T get(Class<T> handlerType);
          32     ChannelHandlerContext context(ChannelHandler handler);
          33     ChannelHandlerContext context(String name);
          34     ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
          35     Channel channel();
          36     List<String> names();
          37     Map<String, ChannelHandler> toMap();
          38 }

           

           


           

          ChannelPipelineUML


          ChannelHandler
          接口用于處理和攔截Channel接口中的ChannelEventsNetty中的ChannelPipeline概念使用了Intecepting Filter Patter模式來實(shí)現(xiàn),這樣做的好處是允許用戶可以完全控制Event的處理,并且控制ChannelHandlersChannelPipeline的交互。

           

          當(dāng)一個全新的Channel創(chuàng)建的時候,都必須創(chuàng)建一個ChannelPipeline,并使ChannelChannelPipeline想關(guān)聯(lián)。這種關(guān)聯(lián)關(guān)系式永久性的,這意味著,一旦一個ChannelPipeleineChannel關(guān)聯(lián)了,那么這個Channel就在也無法關(guān)聯(lián)其他ChannelPipeline,也無法取消與當(dāng)前ChannelPipeline的關(guān)聯(lián)。

          【官方推薦】使用Channel接口中的pipeleine()方法來創(chuàng)建一個ChannelPipelien,而不要用new去實(shí)例一個ChannePipeline

          ChannelPipeline pipeline = Channel.pipeline();

           

          Pipeline中的事件流


          ChannelPipeline中的事件流

          圖中顯示了一個典型的ChannelHandler或者ChannelPipeline對于ChannelEvent的處理流程。ChannelHandler接口有兩個子類,分別是ChannelUpstreamHandler(ChannelInboundHandler)ChannelDownstreamHandler(ChannelOutBoundstreamHandler)。這兩個之類用于處理每一個ChannelEvent,然后由ChannelHandlerContext.sendUpstream(ChannelEvent)ChannelHandlerContext.sendDownstream(ChannelEvent)將每一個ChannelEvent轉(zhuǎn)發(fā)到最近的handler。根據(jù)upstreamdownstream的不同,每個Event的處理也會有所不同。

           

          如事件流圖中的左邊顯示,Upstream Handlers會從低至上的處理一個Upstream EventInbound的數(shù)據(jù)有圖中底部的Netty Internal I/O threads生成。通過調(diào)用InputStream.readByte(byte[])方法,可以從一個遠(yuǎn)程的服務(wù)器讀取inbound data。如果一個upstream event達(dá)到upstream handler的頂部,那么這個upstream event最終將被丟棄掉。

           

          如事件流圖中的右邊顯示,Dpstream Handlers會從低至上的處理一個Upstream EventDownstream Handler會生成和傳輸outbount數(shù)據(jù)流,比如一個寫操作。當(dāng)一個Downstream Event達(dá)到Downstream Handler的底部,那么與之相關(guān)的Channal中的I/O thread對對其進(jìn)行處理。Channel中的I/Othread會執(zhí)行真正的操作,例如OutputStream.write(byte[])

           

           

          假設(shè)我們創(chuàng)建了這么一個ChannelPipeline

          ChannelPipelien p = Channel.pipeline();

          p.addLast(“1”, new UpstreamHandlerA());

          p.addList(“2”, new UpstreamHandlerB());

          p.addList(“3”, new DownstreamHandlerA());

          p.addList(“4”, new DownstreamHandlerB());

          p.addList(“5”, new UpstreamHandlerX());

          ChannelPipeline的棧中,upstream的執(zhí)行順序是12,而downstream的執(zhí)行順序是43

           

          生產(chǎn)Pipeline

          在一條Pipeline中,一般會有一個或者多個ChannelHandler用于接收I/O事件(read)或者請求I/O操作(writeclose)。一個典型的服務(wù)器會有如下的一個ChannelPipeline用于處理不同的ChannelHandler

          ChannelPipelien p = Channel.pipeline();

          p.addLast(“decoder”, new MyProtocalDecoder());

          p.addList(“encoder”, new MyProtocalEncoder());

          p.addList(“executor”, new ExectionHandler());

          p.addList(“handler”, new MyBusinessLogicHandler());

           

          1.   Protocal Decoder – 將二進(jìn)制數(shù)據(jù)(如ByteBuf)裝換成Java對象。

          2.   Protocal Encoder – Java對象裝換成二進(jìn)制數(shù)據(jù)。

          3.   ExecutionHandler – 使用一個線程模型。

          4.   BusinessLogicHandler – 執(zhí)行一個具體的業(yè)務(wù)邏輯(如數(shù)據(jù)庫訪問)

           

          由于ChannelPipeline是線程安全的,所以ChannelHandler可以再任何時候從ChannelPipeline中被添加或者刪除。例如,可以插入一個Handler用于處理被加密過的敏感數(shù)據(jù)信息,在處理之后,刪除掉這個Handler

           
          備注:因?yàn)楣P者開始寫Netty源碼分析的時候,Netty 4.0還是處于Alpha階段,之后的API可能還會有改動,筆者將會及時更改。使用開源已經(jīng)有好幾年的時間了,一直沒有時間和精力來具體研究某個開源項(xiàng)目的具體實(shí)現(xiàn),這次是第一次寫開源項(xiàng)目的源碼分析,如果文中有錯誤的地方,歡迎讀者可以留言指出。對于轉(zhuǎn)載的讀者,請注明文章的出處。
          希望和廣大的開發(fā)者/開源愛好者進(jìn)行交流,歡迎大家的留言和討論。

           



          -----------------------------------------------------
          Silence, the way to avoid many problems;
          Smile, the way to solve many problems;

          posted on 2012-11-25 14:53 Chan Chen 閱讀(9158) 評論(0)  編輯  收藏 所屬分類: Netty

          主站蜘蛛池模板: 龙海市| 北票市| 启东市| 顺昌县| 敦化市| 三江| 古田县| 汕尾市| 本溪市| 宜都市| 云林县| 彭阳县| 吐鲁番市| 东乡| 曲麻莱县| 澄城县| 清镇市| 文登市| 莲花县| 黔南| 鹤壁市| 石台县| 汉沽区| 申扎县| 莲花县| 曲靖市| 荆州市| 昔阳县| 永福县| 忻州市| 景德镇市| 历史| 苏尼特左旗| 瑞丽市| 台南县| 呼和浩特市| 清流县| 闽清县| 山东| 新昌县| 清镇市|