問題描述
在服務器編程中,通常需要處理多種不同的請求,在正式處理請求之前,需要對請求做一些預處理,如:- 紀錄每個Client的每次訪問信息。
- 對Client進行認證和授權檢查(Authentication and Authorization)。
- 檢查當前Session是否合法。
- 檢查Client的IP地址是否可信賴或不可信賴(IP地址白名單、黑名單)。
- 請求數(shù)據(jù)是否先要解壓或解碼。
- 是否支持Client請求的類型、Browser版本等。
- 添加性能監(jiān)控信息。
- 添加調(diào)試信息。
- 保證所有異常都被正確捕獲到,對未預料到的異常做通用處理,防止給Client看到內(nèi)部堆棧信息。
在響應返回給客戶端之前,有時候也需要做一些預處理再返回:
- 對響應消息編碼或壓縮。
- 為所有響應添加公共頭、尾等消息。
- 進一步Enrich響應消息,如添加公共字段、Session信息、Cookie信息,甚至完全改變響應消息等。
問題解決
要實現(xiàn)這種需求,最直觀的方法就是在每個請求處理過程中添加所有這些邏輯,為了減少代碼重復,可以將所有這些檢查提取成方法,這樣在每個處理方法中調(diào)用即可:public Response service1(Request request) {
validate(request);
request = transform(request);
Response response = process1(request);
return transform(response);
}
此時,如果出現(xiàn)service2方法,依然需要拷貝service1中的實現(xiàn),然后將process1換成process2即可。這個時候我們發(fā)現(xiàn)很多重復代碼,繼續(xù)對它重構,比如提取公共邏輯到基類成模版方法,這種使用繼承的方式會引起子類對父類的耦合,如果要讓某些模塊變的可配置需要有太多的判斷邏輯,代碼變的臃腫;因而可以更進一步,將所有處理邏輯抽象出一個Processor接口,然后使用Decorate模式(即引用優(yōu)于繼承):validate(request);
request = transform(request);
Response response = process1(request);
return transform(response);
}
public interface Processor {
Response process(Request request);
}
public class CoreProcessor implements Processor {
public Response process(Request request) {
// do process/calculation
}
}
public class DecoratedProcessor implements Processor {
private final Processor innerProcessor;
public DecoratedProcessor(Processor processor) {
this.innerProcessor = processor;
}
public Response process(Request request) {
request = preProcess(request);
Response response = innerProcessor.process(request);
response = postProcess(response);
return response;
}
protected Request preProcess(Request request) {
return request;
}
protected Response postProcess(Response response) {
return response;
}
}
public void Transformer extends DecoratedProcessor {
public Transformer(Processor processor) {
super(processor);
}
protected Request preProcess(Request request) {
return transformRequest(request);
}
protected Response postProcess(Response response) {
return transformResponse(response);
}
}
此時,如果需要在真正的處理邏輯之前加入其他的預處理邏輯,只需要繼承DecoratedProcessor,實現(xiàn)preProcess或postProcess方法,分別在請求處理之前和請求處理之后橫向切入一些邏輯,也就是所謂的AOP編程:面向切面的編程,然后只需要根據(jù)需求構建這個鏈條:Response process(Request request);
}
public class CoreProcessor implements Processor {
public Response process(Request request) {
// do process/calculation
}
}
public class DecoratedProcessor implements Processor {
private final Processor innerProcessor;
public DecoratedProcessor(Processor processor) {
this.innerProcessor = processor;
}
public Response process(Request request) {
request = preProcess(request);
Response response = innerProcessor.process(request);
response = postProcess(response);
return response;
}
protected Request preProcess(Request request) {
return request;
}
protected Response postProcess(Response response) {
return response;
}
}
public void Transformer extends DecoratedProcessor {
public Transformer(Processor processor) {
super(processor);
}
protected Request preProcess(Request request) {
return transformRequest(request);
}
protected Response postProcess(Response response) {
return transformResponse(response);
}
}
Processor processor = new MissingExceptionCatcher(new Debugger(new Transformer(new CoreProcessor());
Response response = processor.process(request);
......
這已經(jīng)是相對比較好的設計了,每個Processor只需要關注自己的實現(xiàn)邏輯即可,代碼變的簡潔;并且每個Processor各自獨立,可重用性好,測試方便;整條鏈上能實現(xiàn)的功能只是取決于鏈的構造,因而只需要有一種方法配置鏈的構造即可,可配置性也變得靈活;然而很多時候引用是一種靜態(tài)的依賴,而無法滿足動態(tài)的需求。要構造這條鏈,每個前置Processor需要知道其后的Processor,這在某些情況下并不是在起初就知道的。此時,我們需要引入Intercepting Filter模式來實現(xiàn)動態(tài)的改變條鏈。Response response = processor.process(request);
......
Intercepting Filter模式
在前文已經(jīng)構建了一條由引用而成的Processor鏈,然而這是一條靜態(tài)鏈,并且需要一開始就能構造出這條鏈,為了解決這個限制,我們可以引入一個ProcessorChain來維護這條鏈,并且這條鏈可以動態(tài)的構建。有多種方式可以實現(xiàn)并控制這個鏈:
- 在存儲上,可以使用數(shù)組來存儲所有的Processor,Processor在數(shù)組中的位置表示這個Processor在鏈條中的位置;也可以用鏈表來存儲所有的Processor,此時Processor在這個鏈表中的位置即是在鏈中的位置。
- 在抽象上,可以所有的邏輯都封裝在Processor中,也可以將核心邏輯使用Processor抽象,而外圍邏輯使用Filter抽象。
- 在流程控制上,一般通過在Processor實現(xiàn)方法中直接使用ProcessorChain實例(通過參數(shù)摻入)來控制流程,利用方法調(diào)用的進棧出棧的特性實現(xiàn)preProcess()和postProcess()處理。
Intercepting Filter模式在Servlet的Filter中的實現(xiàn)(Jetty版本)
其中Servlet的Filter在Jetty的實現(xiàn)中使用數(shù)組存儲Filter,F(xiàn)ilter末尾可以使用Servlet實例處理真正的業(yè)務邏輯,在流程控制上,使用FilterChain的doFilter方法來實現(xiàn)。如FilterChain在Jetty中的實現(xiàn):public void doFilter(ServletRequest request, ServletResponse response) throws IOException, ServletException
// pass to next filter
if (_filter < LazyList.size(_chain)) {
FilterHolder holder= (FilterHolder)LazyList.get(_chain, _filter++);
Filter filter= holder.getFilter();
filter.doFilter(request, response, this);
return;
}
// Call servlet
HttpServletRequest srequest = (HttpServletRequest)request;
if (_servletHolder != null) {
_servletHolder.handle(_baseRequest,request, response);
}
}
這里,_chain實際上是一個Filter的ArrayList,由FilterChain調(diào)用doFilter()啟動調(diào)用第一個Filter的doFilter()方法,在實際的Filter實現(xiàn)中,需要手動的調(diào)用FilterChain.doFilter()方法來啟動下一個Filter的調(diào)用,利用方法調(diào)用的進棧出棧的特性實現(xiàn)Request的pre-process和Response的post-process處理。如果不調(diào)用FilterChain.doFilter()方法,則表示不需要調(diào)用之后的Filter,流程從當前Filter返回,在它之前的Filter的FilterChain.doFilter()調(diào)用之后的邏輯反向處理直到第一個Filter處理完成而返回。// pass to next filter
if (_filter < LazyList.size(_chain)) {
FilterHolder holder= (FilterHolder)LazyList.get(_chain, _filter++);
Filter filter= holder.getFilter();
filter.doFilter(request, response, this);
return;
}
// Call servlet
HttpServletRequest srequest = (HttpServletRequest)request;
if (_servletHolder != null) {
_servletHolder.handle(_baseRequest,request, response);
}
}
public class MyFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
// pre-process ServletRequest
chain.doFilter(request, response);
// post-process Servlet Response
}
}
整個Filter鏈的處理流程如下:public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
// pre-process ServletRequest
chain.doFilter(request, response);
// post-process Servlet Response
}
}

Intercepting Filter模式在Netty3中的實現(xiàn)
Netty3在DefaultChannelPipeline中實現(xiàn)了Intercepting Filter模式,其中ChannelHandler是它的Filter。在Netty3的DefaultChannelPipeline中,使用一個以ChannelHandlerContext為節(jié)點的雙向鏈表來存儲ChannelHandler,所有的橫切面邏輯和實際業(yè)務邏輯都用ChannelHandler表達,在控制流程上使用ChannelHandlerContext的sendDownstream()和sendUpstream()方法來控制流程。不同于Servlet的Filter,ChannelHandler有兩個子接口:ChannelUpstreamHandler和ChannelDownstreamHandler分別用來請求進入時的處理流程和響應出去時的處理流程。對于Client的請求,從DefaultChannelPipeline的sendUpstream()方法入口:public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof UpstreamMessageEvent) {
throw new IllegalArgumentException("cannot send an upstream event to downstream");
}
try {
((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e)
} catch (Throwable t) {
e.getFuture().setFailure(t);
notifyHandlerException(e, t);
}
}
如果有響應消息,該消息從DefaultChannelPipeline的sendDownstream()方法為入口:DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof UpstreamMessageEvent) {
throw new IllegalArgumentException("cannot send an upstream event to downstream");
}
try {
((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e)
} catch (Throwable t) {
e.getFuture().setFailure(t);
notifyHandlerException(e, t);
}
}
public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
if (head == null) {
return;
}
sendUpstream(head, e);
}
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}
在實際實現(xiàn)ChannelUpstreamHandler或ChannelDownstreamHandler時,調(diào)用ChannelHandlerContext中的sendUpstream或sendDownstream方法將控制流程交給下一個ChannelUpstreamHandler或下一個ChannelDownstreamHandler,或調(diào)用Channel中的write方法發(fā)送響應消息。DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
if (head == null) {
return;
}
sendUpstream(head, e);
}
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}
public class MyChannelUpstreamHandler implements ChannelUpstreamHandler {
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// handle current logic, use Channel to write response if needed.
// ctx.getChannel().write(message);
ctx.sendUpstream(e);
}
}
public class MyChannelDownstreamHandler implements ChannelDownstreamHandler {
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// handle current logic
ctx.sendDownstream(e);
}
}
當ChannelHandler向ChannelPipelineContext發(fā)送事件時,其內(nèi)部從當前ChannelPipelineContext
節(jié)點出發(fā)找到下一個ChannelUpstreamHandler或ChannelDownstreamHandler實例,并向其發(fā)送
ChannelEvent,對于Downstream鏈,如果到達鏈尾,則將ChannelEvent發(fā)送給ChannelSink:public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// handle current logic, use Channel to write response if needed.
// ctx.getChannel().write(message);
ctx.sendUpstream(e);
}
}
public class MyChannelDownstreamHandler implements ChannelDownstreamHandler {
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// handle current logic
ctx.sendDownstream(e);
}
}
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
if (prev == null) {
try {
getSink().eventSunk(DefaultChannelPipeline.this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline.this.sendDownstream(prev, e);
}
}
public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
if (next != null) {
DefaultChannelPipeline.this.sendUpstream(next, e);
}
}
正是因為這個實現(xiàn),如果在一個末尾的ChannelUpstreamHandler中先移除自己,在向末尾添加一個新的ChannelUpstreamHandler,它是無效的,因為它的next已經(jīng)在調(diào)用前就固定設置為null了。DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
if (prev == null) {
try {
getSink().eventSunk(DefaultChannelPipeline.this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline.this.sendDownstream(prev, e);
}
}
public void sendUpstream(ChannelEvent e) {
DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
if (next != null) {
DefaultChannelPipeline.this.sendUpstream(next, e);
}
}
在DefaultChannelPipeline的ChannelHandler鏈條的處理流程為:

在這個實現(xiàn)中,不像Servlet的Filter實現(xiàn)利用方法調(diào)用棧的進出棧來完成pre-process和post-process,而是在進去的鏈和出來的鏈各自調(diào)用handleUpstream()和handleDownstream()方法,這樣會引起調(diào)用棧其實是兩條鏈的總和,因而需要注意這條鏈的總長度。這樣做的好處是這條ChannelHandler的鏈不依賴于方法調(diào)用棧,而是在DefaultChannelPipeline內(nèi)部本身的鏈,因而在handleUpstream()或handleDownstream()可以隨時將執(zhí)行流程轉發(fā)給其他線程或線程池,只需要保留ChannelPipelineContext引用,在處理完成后用這個ChannelPipelineContext重新向這條鏈的后一個節(jié)點發(fā)送ChannelEvent,然而由于Servlet的Filter依賴于方法的調(diào)用棧,因而方法返回意味著所有執(zhí)行完成,這種限制在異步編程中會引起問題,因而Servlet在3.0后引入了Async的支持。
Intercepting Filter模式的缺點
簡單提一下這個模式的缺點:1. 相對傳統(tǒng)的編程模型,這個模式有一定的學習曲線,需要很好的理解該模式后才能靈活的應用它來編程。
2. 需要劃分不同的邏輯到不同的Filter中,這有些時候并不是那么容易。
3. 各個Filter之間共享數(shù)據(jù)將變得困難。在Netty3中可以自定義自己的ChannelEvent來實現(xiàn)自定義消息的傳輸,或者使用ChannelPipelineContext的Attachment字段來實現(xiàn)消息傳輸,而Servlet中的Filter則沒有提供類似的機制,如果不是可以配置的數(shù)據(jù)在Config中傳遞,其他時候的數(shù)據(jù)共享需要其他機制配合完成。