上善若水
          In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatra
          posts - 146,comments - 147,trackbacks - 0
          <2015年9月>
          303112345
          6789101112
          13141516171819
          20212223242526
          27282930123
          45678910

          常用鏈接

          留言簿(4)

          隨筆分類(157)

          隨筆檔案(125)

          收藏夾(13)

          Java GC

          Java General

          NoSQL

          Tech General

          Tech Master

          最新隨筆

          搜索

          •  

          積分與排名

          • 積分 - 895292
          • 排名 - 42

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          前記

          很早以前就有讀Netty源碼的打算了,然而第一次嘗試的時(shí)候從Netty4開始,一直抓不到核心的框架流程,后來因?yàn)槠渌虑槊χ头畔铝?。這次趁著休假重新?lián)炱疬@個(gè)硬骨頭,因?yàn)镹etty3現(xiàn)在還在被很多項(xiàng)目使用,因而這次決定先從Netty3入手,瞬間發(fā)現(xiàn)Netty3的代碼比Netty4中規(guī)中矩的多,很多概念在代碼本身中都有清晰的表達(dá),所以半天就把整個(gè)框架的骨架搞清楚了。再讀Netty4對(duì)Netty3的改進(jìn)總結(jié),回去讀Netty4的源碼,反而覺得輕松了,一種豁然開朗的感覺。

          記得去年讀Jetty源碼的時(shí)候,因?yàn)榇a太龐大,并且自己的HTTP Server的了解太少,因而只能自底向上的一個(gè)一個(gè)模塊的疊加,直到最后把所以的模塊連接在一起而看清它的真正核心骨架?,F(xiàn)在讀源碼,開始習(xí)慣先把骨架理清,然后延伸到不同的器官、血肉而看清整個(gè)人體。

          本文從Reactor模式在Netty3中的應(yīng)用,引出Netty3的整體架構(gòu)以及控制流程;然而除了Reactor模式,Netty3還在ChannelPipeline中使用了Intercepting Filter模式,這個(gè)模式也在Servlet的Filter中成功使用,因而本文還會(huì)從Intercepting Filter模式出發(fā)詳細(xì)介紹ChannelPipeline的設(shè)計(jì)理念。本文假設(shè)讀者已經(jīng)對(duì)Netty有一定的了解,因而不會(huì)包含過多入門介紹,以及幫Netty做宣傳的文字。

          Netty3中的Reactor模式

          Reactor模式在Netty中應(yīng)用非常成功,因而它也是在Netty中受大肆宣傳的模式,關(guān)于Reactor模式可以詳細(xì)參考本人的另一篇文章《Reactor模式詳解》,對(duì)Reactor模式的實(shí)現(xiàn)是Netty3的基本骨架,因而本小節(jié)會(huì)詳細(xì)介紹Reactor模式如何應(yīng)用Netty3中。

          如果讀《Reactor模式詳解》,我們知道Reactor模式由Handle、Synchronous Event Demultiplexer、Initiation Dispatcher、Event Handler、Concrete Event Handler構(gòu)成,在Java的實(shí)現(xiàn)版本中,Channel對(duì)應(yīng)Handle,Selector對(duì)應(yīng)Synchronous Event Demultiplexer,并且Netty3還使用了兩層Reactor:Main Reactor用于處理Client的連接請(qǐng)求,Sub Reactor用于處理和Client連接后的讀寫請(qǐng)求(關(guān)于這個(gè)概念還可以參考Doug Lea的這篇PPT:Scalable IO In Java)。所以我們先要解決Netty3中使用什么類實(shí)現(xiàn)所有的上述模塊并把他們聯(lián)系在一起的,以NIO實(shí)現(xiàn)方式為例:

          模式是一種抽象,但是在實(shí)現(xiàn)中,經(jīng)常會(huì)因?yàn)檎Z言特性、框架和性能需要而做一些改變,因而Netty3對(duì)Reactor模式的實(shí)現(xiàn)有一套自己的設(shè)計(jì):
          1. ChannelEvent:Reactor是基于事件編程的,因而在Netty3中使用ChannelEvent抽象的表達(dá)Netty3內(nèi)部可以產(chǎn)生的各種事件,所有這些事件對(duì)象在Channels幫助類中產(chǎn)生,并且由它將事件推入到ChannelPipeline中,ChannelPipeline構(gòu)建ChannelHandler管道,ChannelEvent流經(jīng)這個(gè)管道實(shí)現(xiàn)所有的業(yè)務(wù)邏輯處理。ChannelEvent對(duì)應(yīng)的事件有:ChannelStateEvent表示Channel狀態(tài)的變化事件,而如果當(dāng)前Channel存在Parent Channel,則該事件還會(huì)傳遞到Parent Channel的ChannelPipeline中,如OPEN、BOUND、CONNECTED、INTEREST_OPS等,該事件可以在各種不同實(shí)現(xiàn)的Channel、ChannelSink中產(chǎn)生;MessageEvent表示從Socket中讀取數(shù)據(jù)完成、需要向Socket寫數(shù)據(jù)或ChannelHandler對(duì)當(dāng)前Message解析(如Decoder、Encoder)后觸發(fā)的事件,它由NioWorker、需要對(duì)Message做進(jìn)一步處理的ChannelHandler產(chǎn)生;WriteCompletionEvent表示寫完成而觸發(fā)的事件,它由NioWorker產(chǎn)生;ExceptionEvent表示在處理過程中出現(xiàn)的Exception,它可以發(fā)生在各個(gè)構(gòu)件中,如Channel、ChannelSink、NioWorker、ChannelHandler中;IdleStateEvent由IdleStateHandler觸發(fā),這也是一個(gè)ChannelEvent可以無縫擴(kuò)展的例子。注:在Netty4后,已經(jīng)沒有ChannelEvent類,所有不同事件都用對(duì)應(yīng)方法表達(dá),這也意味這ChannelEvent不可擴(kuò)展,Netty4采用在ChannelInboundHandler中加入userEventTriggered()方法來實(shí)現(xiàn)這種擴(kuò)展,具體可以參考這里。
          2. ChannelHandler:在Netty3中,ChannelHandler用于表示Reactor模式中的EventHandler。ChannelHandler只是一個(gè)標(biāo)記接口,它有兩個(gè)子接口:ChannelDownstreamHandler和ChannelUpstreamHandler,其中ChannelDownstreamHandler表示從用戶應(yīng)用程序流向Netty3內(nèi)部直到向Socket寫數(shù)據(jù)的管道,在Netty4中改名為ChannelOutboundHandler;ChannelUpstreamHandler表示數(shù)據(jù)從Socket進(jìn)入Netty3內(nèi)部向用戶應(yīng)用程序做數(shù)據(jù)處理的管道,在Netty4中改名為ChannelInboundHandler。
          3. ChannelPipeline:用于管理ChannelHandler的管道,每個(gè)Channel一個(gè)ChannelPipeline實(shí)例,可以運(yùn)行過程中動(dòng)態(tài)的向這個(gè)管道中添加、刪除ChannelHandler(由于實(shí)現(xiàn)的限制,在最末端的ChannelHandler向后添加或刪除ChannelHandler不一定在當(dāng)前執(zhí)行流程中起效,參考這里)。ChannelPipeline內(nèi)部維護(hù)一個(gè)ChannelHandler的雙向鏈表,它以Upstream(Inbound)方向?yàn)檎颍珼ownstream(Outbound)方向?yàn)榉较颉hannelPipeline采用Intercepting Filter模式實(shí)現(xiàn),具體可以參考這里,這個(gè)模式的實(shí)現(xiàn)在后一節(jié)中還是詳細(xì)介紹。
          4. NioSelector:Netty3使用NioSelector來存放Selector(Synchronous Event Demultiplexer),每個(gè)新產(chǎn)生的NIO Channel都向這個(gè)Selector注冊(cè)自己以讓這個(gè)Selector監(jiān)聽這個(gè)NIO Channel中發(fā)生的事件,當(dāng)事件發(fā)生時(shí),調(diào)用幫助類Channels中的方法生成ChannelEvent實(shí)例,將該事件發(fā)送到這個(gè)Netty Channel對(duì)應(yīng)的ChannelPipeline中,而交給各級(jí)ChannelHandler處理。其中在向Selector注冊(cè)NIO Channel時(shí),Netty Channel實(shí)例以Attachment的形式傳入,該Netty Channel在其內(nèi)部的NIO Channel事件發(fā)生時(shí),會(huì)以Attachment的形式存在于SelectionKey中,因而每個(gè)事件可以直接從這個(gè)Attachment中獲取相關(guān)鏈的Netty Channel,并從Netty Channel中獲取與之相關(guān)聯(lián)的ChannelPipeline,這個(gè)實(shí)現(xiàn)和Doug Lea的Scalable IO In Java一模一樣。另外Netty3還采用了Scalable IO In Java中相同的Main Reactor和Sub Reactor設(shè)計(jì),其中NioSelector的兩個(gè)實(shí)現(xiàn):Boss即為Main Reactor,NioWorker為Sub Reactor。Boss用來處理新連接加入的事件,NioWorker用來處理各個(gè)連接對(duì)Socket的讀寫事件,其中Boss通過NioWorkerPool獲取NioWorker實(shí)例,Netty3模式使用RoundRobin方式放回NioWorker實(shí)例。更形象一點(diǎn)的,可以通過Scalable IO In Java的這張圖表達(dá):

          若與Ractor模式對(duì)應(yīng),NioSelector中包含了Synchronous Event Demultiplexer,而ChannelPipeline中管理著所有EventHandler,因而NioSelector和ChannelPipeline共同構(gòu)成了Initiation Dispatcher。
          5. ChannelSink:在ChannelHandler處理完成所有邏輯需要向客戶端寫響應(yīng)數(shù)據(jù)時(shí),一般會(huì)調(diào)用Netty Channel中的write方法,然而在這個(gè)write方法實(shí)現(xiàn)中,它不是直接向其內(nèi)部的Socket寫數(shù)據(jù),而是交給Channels幫助類,內(nèi)部創(chuàng)建DownstreamMessageEvent,反向從ChannelPipeline的管道中流過去,直到第一個(gè)ChannelHandler處理完畢,最后交給ChannelSink處理,以避免阻塞寫而影響程序的吞吐量。ChannelSink將這個(gè)MessageEvent提交給Netty Channel中的writeBufferQueue,最后NioWorker會(huì)等到這個(gè)NIO Channel已經(jīng)可以處理寫事件時(shí)無阻塞的向這個(gè)NIO Channel寫數(shù)據(jù)。這就是上圖的send是從SubReactor直接出發(fā)的原因。
          6. Channel:Netty有自己的Channel抽象,它是一個(gè)資源的容器,包含了所有一個(gè)連接涉及到的所有資源的飲用,如封裝NIO Channel、ChannelPipeline、Boss、NioWorkerPool等。另外它還提供了向內(nèi)部NIO Channel寫響應(yīng)數(shù)據(jù)的接口write、連接/綁定到某個(gè)地址的connect/bind接口等,個(gè)人感覺雖然對(duì)Channel本身來說,因?yàn)樗庋b了NIO Channel,因而這些接口定義在這里是合理的,但是如果考慮到Netty的架構(gòu),它的Channel只是一個(gè)資源容器,有這個(gè)Channel實(shí)例就可以得到和它相關(guān)的基本所有資源,因而這種write、connect、bind動(dòng)作不應(yīng)該再由它負(fù)責(zé),而是應(yīng)該由其他類來負(fù)責(zé),比如在Netty4中就在ChannelHandlerContext添加了write方法,雖然netty4并沒有刪除Channel中的write接口。

          Netty3中的Intercepting Filter模式

          如果說Reactor模式是Netty3的骨架,那么Intercepting Filter模式則是Netty的中樞。Reactor模式主要應(yīng)用在Netty3的內(nèi)部實(shí)現(xiàn),它是Netty3具有良好性能的基礎(chǔ),而Intercepting Filter模式則是ChannelHandler組合實(shí)現(xiàn)一個(gè)應(yīng)用程序邏輯的基礎(chǔ),只有很好的理解了這個(gè)模式才能使用好Netty,甚至能得心應(yīng)手。

          關(guān)于Intercepting Filter模式的詳細(xì)介紹可以參考這里,本節(jié)主要介紹Netty3中對(duì)Intercepting Filter模式的實(shí)現(xiàn),其實(shí)就是DefaultChannelPipeline對(duì)Intercepting Filter模式的實(shí)現(xiàn)。在上文有提到Netty3的ChannelPipeline是ChannelHandler的容器,用于存儲(chǔ)與管理ChannelHandler,同時(shí)它在Netty3中也起到橋梁的作用,即它是連接Netty3內(nèi)部到所有ChannelHandler的橋梁。作為ChannelPipeline的實(shí)現(xiàn)者DefaultChannelPipeline,它使用一個(gè)ChannelHandler的雙向鏈表來存儲(chǔ),以DefaultChannelPipelineContext作為節(jié)點(diǎn):
          public interface ChannelHandlerContext {
              Channel getChannel();

              ChannelPipeline getPipeline();

              String getName();

              ChannelHandler getHandler();

              
          boolean canHandleUpstream();
              
          boolean canHandleDownstream();
              
          void sendUpstream(ChannelEvent e);
              
          void sendDownstream(ChannelEvent e);
              Object getAttachment();

              
          void setAttachment(Object attachment);
          }

          private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
             
          volatile DefaultChannelHandlerContext next;
             
          volatile DefaultChannelHandlerContext prev;
             
          private final String name;
             
          private final ChannelHandler handler;
             
          private final boolean canHandleUpstream;
             
          private final boolean canHandleDownstream;
             
          private volatile Object attachment;
          .....
          }
          在DefaultChannelPipeline中,它存儲(chǔ)了和當(dāng)前ChannelPipeline相關(guān)聯(lián)的Channel、ChannelSink以及ChannelHandler鏈表的head、tail,所有ChannelEvent通過sendUpstream、sendDownstream為入口流經(jīng)整個(gè)鏈表:
          public class DefaultChannelPipeline implements ChannelPipeline {
              
          private volatile Channel channel;
              
          private volatile ChannelSink sink;
              
          private volatile DefaultChannelHandlerContext head;
              
          private volatile DefaultChannelHandlerContext tail;
          ......
              
          public void sendUpstream(ChannelEvent e) {
                  DefaultChannelHandlerContext head 
          = getActualUpstreamContext(this.head);
                  
          if (head == null) {
                      
          return;
                  }
                  sendUpstream(head, e);
              }

              
          void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
                  
          try {
                      ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
                  } 
          catch (Throwable t) {
                      notifyHandlerException(e, t);
                  }
              }

              
          public void sendDownstream(ChannelEvent e) {
                  DefaultChannelHandlerContext tail 
          = getActualDownstreamContext(this.tail);
                  
          if (tail == null) {
                      
          try {
                          getSink().eventSunk(
          this, e);
                          
          return;
                      } 
          catch (Throwable t) {
                          notifyHandlerException(e, t);
                          
          return;
                      }
                  }
                  sendDownstream(tail, e);
              }

              
          void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
                  
          if (e instanceof UpstreamMessageEvent) {
                      
          throw new IllegalArgumentException("cannot send an upstream event to downstream");
                  }
                  
          try {
                      ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
                  } 
          catch (Throwable t) {
                      e.getFuture().setFailure(t);
                      notifyHandlerException(e, t);
                  }
              }
          對(duì)Upstream事件,向后找到所有實(shí)現(xiàn)了ChannelUpstreamHandler接口的ChannelHandler組成鏈(getActualUpstreamContext()),而對(duì)Downstream事件,向前找到所有實(shí)現(xiàn)了ChannelDownstreamHandler接口的ChannelHandler組成鏈(getActualDownstreamContext()):
              private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
                  
          if (ctx == null) {
                      
          return null;
                  }
                  DefaultChannelHandlerContext realCtx 
          = ctx;
                  
          while (!realCtx.canHandleUpstream()) {
                      realCtx 
          = realCtx.next;
                      
          if (realCtx == null) {
                          
          return null;
                      }
                  }
                  
          return realCtx;
              }
              
          private DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
                  
          if (ctx == null) {
                      
          return null;
                  }
                  DefaultChannelHandlerContext realCtx 
          = ctx;
                  
          while (!realCtx.canHandleDownstream()) {
                      realCtx 
          = realCtx.prev;
                      
          if (realCtx == null) {
                          
          return null;
                      }
                  }
                  
          return realCtx;
              }
          在實(shí)際實(shí)現(xiàn)ChannelUpstreamHandler或ChannelDownstreamHandler時(shí),調(diào)用 ChannelHandlerContext中的sendUpstream或sendDownstream方法將控制流程交給下一個(gè) ChannelUpstreamHandler或下一個(gè)ChannelDownstreamHandler,或調(diào)用Channel中的write方法發(fā)送 響應(yīng)消息。
          public class MyChannelUpstreamHandler implements ChannelUpstreamHandler {
              
          public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
                  
          // handle current logic, use Channel to write response if needed.
                  
          // ctx.getChannel().write(message);
                  ctx.sendUpstream(e);
              }
          }

          public class MyChannelDownstreamHandler implements ChannelDownstreamHandler {
              
          public void handleDownstream(
                      ChannelHandlerContext ctx, ChannelEvent e) 
          throws Exception {
                  
          // handle current logic
                  ctx.sendDownstream(e);
              }
          }
          當(dāng)ChannelHandler向ChannelPipelineContext發(fā)送事件時(shí),其內(nèi)部從當(dāng)前ChannelPipelineContext節(jié)點(diǎn)出發(fā)找到下一個(gè)ChannelUpstreamHandler或ChannelDownstreamHandler實(shí)例,并向其發(fā)送ChannelEvent,對(duì)于Downstream鏈,如果到達(dá)鏈尾,則將ChannelEvent發(fā)送給ChannelSink:
          public void sendDownstream(ChannelEvent e) {
              DefaultChannelHandlerContext prev 
          = getActualDownstreamContext(this.prev);
             
          if (prev == null) {
                 
          try {
                      getSink().eventSunk(DefaultChannelPipeline.
          this, e);
                  } 
          catch (Throwable t) {
                      notifyHandlerException(e, t);
                  }
              } 
          else {
                  DefaultChannelPipeline.
          this.sendDownstream(prev, e);
              }
          }

          public void sendUpstream(ChannelEvent e) {
              DefaultChannelHandlerContext next 
          = getActualUpstreamContext(this.next);
             
          if (next != null) {
                  DefaultChannelPipeline.
          this.sendUpstream(next, e);
              }
          }
          正是因?yàn)檫@個(gè)實(shí)現(xiàn),如果在一個(gè)末尾的ChannelUpstreamHandler中先移除自己,在向末尾添加一個(gè)新的ChannelUpstreamHandler,它是無效的,因?yàn)樗膎ext已經(jīng)在調(diào)用前就固定設(shè)置為null了。

          ChannelPipeline作為ChannelHandler的容器,它還提供了各種增、刪、改ChannelHandler鏈表中的方法,而且如果某個(gè)ChannelHandler還實(shí)現(xiàn)了LifeCycleAwareChannelHandler,則該ChannelHandler在被添加進(jìn)ChannelPipeline或從中刪除時(shí)都會(huì)得到同志:
          public interface LifeCycleAwareChannelHandler extends ChannelHandler {
              
          void beforeAdd(ChannelHandlerContext ctx) throws Exception;
              
          void afterAdd(ChannelHandlerContext ctx) throws Exception;
              
          void beforeRemove(ChannelHandlerContext ctx) throws Exception;
              
          void afterRemove(ChannelHandlerContext ctx) throws Exception;
          }

          public interface ChannelPipeline {
              
          void addFirst(String name, ChannelHandler handler);
              
          void addLast(String name, ChannelHandler handler);
              
          void addBefore(String baseName, String name, ChannelHandler handler);
              
          void addAfter(String baseName, String name, ChannelHandler handler);
              
          void remove(ChannelHandler handler);
              ChannelHandler remove(String name);

              
          <extends ChannelHandler> T remove(Class<T> handlerType);
              ChannelHandler removeFirst();

              ChannelHandler removeLast();

              
          void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
              ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

              
          <extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler);
              ChannelHandler getFirst();

              ChannelHandler getLast();

              ChannelHandler get(String name);

              
          <extends ChannelHandler> T get(Class<T> handlerType);
              ChannelHandlerContext getContext(ChannelHandler handler);

              ChannelHandlerContext getContext(String name);

              ChannelHandlerContext getContext(Class
          <? extends ChannelHandler> handlerType);
              
          void sendUpstream(ChannelEvent e);
              
          void sendDownstream(ChannelEvent e);
              ChannelFuture execute(Runnable task);

              Channel getChannel();

              ChannelSink getSink();

              
          void attach(Channel channel, ChannelSink sink);
              
          boolean isAttached();
              List
          <String> getNames();
              Map
          <String, ChannelHandler> toMap();
          }

          在DefaultChannelPipeline的ChannelHandler鏈條的處理流程為:

          參考:

          《Netty主頁》
          《Netty源碼解讀(四)Netty與Reactor模式》
          《Netty代碼分析》
          Scalable IO In Java
          Intercepting Filter Pattern
          posted on 2015-09-04 09:40 DLevin 閱讀(7677) 評(píng)論(0)  編輯  收藏 所屬分類: Architecture 、Netty

          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 通辽市| 丘北县| 肥乡县| 南昌市| 台北市| 南和县| 巴塘县| 惠安县| 扶沟县| 平和县| 西贡区| 阿克陶县| 津南区| 峨眉山市| 富蕴县| 南雄市| 桃源县| 松江区| 阿图什市| 枣庄市| 龙陵县| 杭州市| 资中县| 康平县| 长沙县| 石棉县| 雅安市| 罗甸县| 同德县| 丰镇市| 北辰区| 娄烦县| 曲阳县| 道真| 革吉县| 柯坪县| 军事| 林周县| 罗江县| 高要市| 文登市|