markdown形式,簡書地址:基于Netty構(gòu)建服務(wù)的基本步驟
基于netty構(gòu)建服務(wù)的基本步驟我們通過netty實現(xiàn)一個Http服務(wù)器的功能,來說明通過netty構(gòu)建的Server基本步驟。學習一個新的知識點,都是通過Hello world開始的,對于netty的學習寫一個Hello world程序不像寫其他程序那么簡單,這里涉及很多非常重要的組件,比如ChannelHandler、EeventLoopGroup、ChannelPipeline等,這些組件隨著后續(xù)不斷學習再一一分析其實現(xiàn)原理。基于netty構(gòu)建Http服務(wù)器基本步驟實踐:1. 首先我們定義兩個線程組 也叫做事件循環(huán)組EevetLoopGroup bossGroup = new NioEevetLoopGroup();
EevetLoopGroup workerGroup = new NioEevetLoopGroup();為什么定義兩個線程組,實際上一個線程組也能完成所需的功能,不過netty建議我們使用兩個線程組,分別具有不同的職責。bossGroup目的是獲取客戶端連接,連接接收到之后再將連接轉(zhuǎn)發(fā)給workerGroup去處理。2. 定義一個輕量級的啟動服務(wù)類 ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, wokrerGroup).channel(NioServerSocketChannel.class).childHandler(null);
// 服務(wù)啟動后通過綁定到8899端口上,返回ChannelFuture。
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync(); 3. 通過ChannelPipeline初始化處理器,類似于攔截器Chain,當客戶端首次連接后即調(diào)用initChannel方法完成初始化動作。[示例代碼]public class TestServerInitializer extends ChannelInitializer<SocketChannel>{
// 初始化器,服務(wù)端啟動后會自動調(diào)用這個方法,它是一個回調(diào)方法。
@Override protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel invoked
"); // 有客戶端連接就會執(zhí)行.
ChannelPipeline channelPipeline = ch.pipeline(); // pipeline一個管道里面可以有很多的ChannelHandler,相當于包含很多個攔截器。
// 添加處理器,可以添加多個,并且可以將處理器放到pipeline管道的不同位置上。
channelPipeline.addLast("httpServerCodec", new HttpServerCodec()); //HttpServerCodec也是一個很重要的組件.
channelPipeline.addLast("httpServerHandler", new TestHttpServerHandler()); // 自定義處理器
}
}4. 創(chuàng)建自定義處理器,通常繼承SimpleChannelInboundHandler<T>, 該處理器覆寫channelRead0方法,該方法負責請求接入,讀取客戶端請求,發(fā)送響應(yīng)給客戶端。[示例代碼]public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
private final String FAVICON_ICO = "/favicon.ico";
// 讀取客戶端請求,向客戶端響應(yīng)的方法,所以這里要構(gòu)造響應(yīng)返回給客戶端。
// 注意:這里面跟Servlet沒有任何關(guān)系,也符合Servlet規(guī)范,所以不會涉及到HttpServerltRequest和HttpServeletResponse對象。
@Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println("--------------httpserverHandler, remote_address " + ctx.channel().remoteAddress() + ", msg_class:" + msg.getClass());
// Thread.sleep(3000); // 休眠5秒鐘,lsof -i:8899 查看TCP連接狀態(tài)
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
URI uri = new URI(httpRequest.uri());
System.out.println("請求方法: " + httpRequest.method() + ", 請求path: " + uri.getPath());
if (FAVICON_ICO.equals(uri.getPath())) {
System.out.println("請求/favicon.ico");
return;
}
// BytBuf:構(gòu)造給客戶端的響應(yīng)內(nèi)容, 制定好編碼
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8);
// 接下構(gòu)造響應(yīng)對象
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
// 調(diào)用flush才會將內(nèi)容真正返回給客戶端
System.out.println("響應(yīng)給客戶端對象: " + response);
ctx.writeAndFlush(response);
ctx.channel().closeFuture();
}
}
//------以下重寫了ChannelInboundHandlerAdapter父類的方法,分析不同事件方法的調(diào)用時機------
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel register invoked");
super.channelRegistered(ctx);
}
@Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel unregister invoked");
super.channelUnregistered(ctx);
}
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel active invoked");
super.channelActive(ctx);
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel inactive invoked");
super.channelInactive(ctx);
}
// TODO 這里執(zhí)行了2次,具體有待分析
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel read complete");
super.channelReadComplete(ctx);
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exception caught invoked");
super.exceptionCaught(ctx, cause);
}
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler add invoked");
super.handlerAdded(ctx);
}
}這里注意,我使用的netty4.x版本,方法名叫做channelRead0,如果在其他文章中看到是messageReceived方法,則使用的是netty5.x,另外,因netty5.x已被廢棄,故建議都使用netty4.x穩(wěn)定版。5. 將步驟1和2整合,寫Main方法啟動服務(wù)[示例代碼]public class TestNettyServer {
public static void main(String[] args) throws InterruptedException {
// 服務(wù)器端可以理解為while(true){}死循環(huán),去不斷的接受請求連接。 EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 啟動服務(wù)端,這里的處理器都要是多實例的. serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitializer());
System.out.println("服務(wù)端已啟動..");
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
System.out.println("服務(wù)端shutdown");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}6. 通過瀏覽器或者curl方式訪問8899端口。最后,通過 curl ‘localhost:8899’訪問成功返回Hello World字符串,如果TestHttpServerHandler的channelRead0中不加msg instanceof HttpRequest的判斷,則運行時會拋出如下異常:java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:745)```debug代碼時會發(fā)現(xiàn)有2個請求執(zhí)行了channelRead0方法,兩個msg分別是--------------httpserverHandler msg:class io.netty.handler.codec.http.DefaultHttpRequest--------------httpserverHandler msg:class io.netty.handler.codec.http.LastHttpContent$1我們看到第二次請求并不是HttpRequest對象,所以此處理器無法處理,通過瀏覽器訪問時,瀏覽器會自動的發(fā)起favion.cio圖片的請求,所以可以增加個判斷如果path是favion.cio則不往下執(zhí)行。TestHttpServerHandler是自己實現(xiàn)的處理器,繼承了SimpleChannelInboundHandler,SimpleChannelInboundHandler繼承了ChannelInboundHandlerAdapter類。**子類可重寫的方法及其含義**:`channelActive() ` >在到服務(wù)器的連接已經(jīng)建立之后將被調(diào)用(成為活躍狀態(tài))` channelRead0()` > 當從服務(wù)器接受到一條消息時被調(diào)用` exceptionCaught()` >在處理過程中引發(fā)異常時調(diào)用` channelReigster() ` >注冊到EventLoop上` handlerAdd() ` >Channel被添加方法` handlerRemoved()` >Channel被刪除方法` channelInActive() ` > Channel離開活躍狀態(tài),不再連接到某一遠端時被調(diào)用` channelUnRegistered()` >Channel從EventLoop上解除注冊` channelReadComplete()` >當Channel上的某個讀操作完成時被調(diào)用在步驟4中有打印輸出,通過curl ‘http://localhost:8899'訪問,執(zhí)行結(jié)果順序:服務(wù)端已啟動..initChannel invoked...handler add invokedchannel register invokedchannel active invoked--------------httpserverHandler, remote_address /127.0.0.1:50061, msg_class:class io.netty.handler.codec.http.DefaultHttpRequest請求方法: GET, 請求path: /響應(yīng)給客戶端對象: DefaultFullHttpResponse(decodeResult: success, version: HTTP/1.1, content: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 11, cap: 33))HTTP/1.1 200 OKcontent-length: 11content-type: text/plain--------------httpserverHandler, remote_address /127.0.0.1:50061, msg_class:class io.netty.handler.codec.http.LastHttpContent$1channel read complete // ?channel read completechannel inactive invokedchannel unregister invoked基本的hellworld程序已經(jīng)運行起來,并且自行實現(xiàn)的處理器調(diào)用過程通過重寫方法打印也能夠有所了解了。這里要注意的是,對于Netty來說,上層應(yīng)用獲取客戶端請求之后,當請求是基于Http1.1協(xié)議的話會有個keepalive時間,比如30秒鐘時間,如果在這段時間內(nèi)沒有接受到新的請求則由[服務(wù)端]主動關(guān)閉連接。當請求是基于Http1.0短連接協(xié)議,請求發(fā)過來之后,服務(wù)器就將這個連接關(guān)閉掉,上述示例中可以根據(jù)判斷調(diào)用ctx .channel().close()來關(guān)閉連接。**基于netty構(gòu)建服務(wù)基本流程總結(jié):**1. 創(chuàng)建EventLoopGroup實例2. 通過ServerBootstrap啟動服務(wù),bind到一個端口. 如果是客戶端,則使用Bootstrap,連接主機和端口. 3. 創(chuàng)建ChannelInitializer實例,通過ChannelPipieline初始化處理器鏈.4. 創(chuàng)建ChannelServerHandler實例,繼承SimpleChannelInboundHandler,重寫channelRead0方法(netty4.x).5. 將ChannelServerHandler實例addLast到ChannelPipeline上.6. 將ChannelInitializer實例childHandler到bootstrap上.
EevetLoopGroup bossGroup = new NioEevetLoopGroup();
EevetLoopGroup workerGroup = new NioEevetLoopGroup();
EevetLoopGroup workerGroup = new NioEevetLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, wokrerGroup).channel(NioServerSocketChannel.class).childHandler(null);
// 服務(wù)啟動后通過綁定到8899端口上,返回ChannelFuture。
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
serverBootstrap.group(bossGroup, wokrerGroup).channel(NioServerSocketChannel.class).childHandler(null);
// 服務(wù)啟動后通過綁定到8899端口上,返回ChannelFuture。
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
public class TestServerInitializer extends ChannelInitializer<SocketChannel>{
// 初始化器,服務(wù)端啟動后會自動調(diào)用這個方法,它是一個回調(diào)方法。
@Override protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel invoked
"); // 有客戶端連接就會執(zhí)行.
ChannelPipeline channelPipeline = ch.pipeline(); // pipeline一個管道里面可以有很多的ChannelHandler,相當于包含很多個攔截器。
// 添加處理器,可以添加多個,并且可以將處理器放到pipeline管道的不同位置上。
channelPipeline.addLast("httpServerCodec", new HttpServerCodec()); //HttpServerCodec也是一個很重要的組件.
channelPipeline.addLast("httpServerHandler", new TestHttpServerHandler()); // 自定義處理器
}
}
// 初始化器,服務(wù)端啟動后會自動調(diào)用這個方法,它是一個回調(diào)方法。
@Override protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel invoked

ChannelPipeline channelPipeline = ch.pipeline(); // pipeline一個管道里面可以有很多的ChannelHandler,相當于包含很多個攔截器。
// 添加處理器,可以添加多個,并且可以將處理器放到pipeline管道的不同位置上。
channelPipeline.addLast("httpServerCodec", new HttpServerCodec()); //HttpServerCodec也是一個很重要的組件.
channelPipeline.addLast("httpServerHandler", new TestHttpServerHandler()); // 自定義處理器
}
}
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
private final String FAVICON_ICO = "/favicon.ico";
// 讀取客戶端請求,向客戶端響應(yīng)的方法,所以這里要構(gòu)造響應(yīng)返回給客戶端。
// 注意:這里面跟Servlet沒有任何關(guān)系,也符合Servlet規(guī)范,所以不會涉及到HttpServerltRequest和HttpServeletResponse對象。
@Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println("--------------httpserverHandler, remote_address " + ctx.channel().remoteAddress() + ", msg_class:" + msg.getClass());
// Thread.sleep(3000); // 休眠5秒鐘,lsof -i:8899 查看TCP連接狀態(tài)
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
URI uri = new URI(httpRequest.uri());
System.out.println("請求方法: " + httpRequest.method() + ", 請求path: " + uri.getPath());
if (FAVICON_ICO.equals(uri.getPath())) {
System.out.println("請求/favicon.ico");
return;
}
// BytBuf:構(gòu)造給客戶端的響應(yīng)內(nèi)容, 制定好編碼
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8);
// 接下構(gòu)造響應(yīng)對象
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
// 調(diào)用flush才會將內(nèi)容真正返回給客戶端
System.out.println("響應(yīng)給客戶端對象: " + response);
ctx.writeAndFlush(response);
ctx.channel().closeFuture();
}
}
//------以下重寫了ChannelInboundHandlerAdapter父類的方法,分析不同事件方法的調(diào)用時機------
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel register invoked");
super.channelRegistered(ctx);
}
@Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel unregister invoked");
super.channelUnregistered(ctx);
}
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel active invoked");
super.channelActive(ctx);
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel inactive invoked");
super.channelInactive(ctx);
}
// TODO 這里執(zhí)行了2次,具體有待分析
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel read complete");
super.channelReadComplete(ctx);
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exception caught invoked");
super.exceptionCaught(ctx, cause);
}
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler add invoked");
super.handlerAdded(ctx);
}
}
private final String FAVICON_ICO = "/favicon.ico";
// 讀取客戶端請求,向客戶端響應(yīng)的方法,所以這里要構(gòu)造響應(yīng)返回給客戶端。
// 注意:這里面跟Servlet沒有任何關(guān)系,也符合Servlet規(guī)范,所以不會涉及到HttpServerltRequest和HttpServeletResponse對象。
@Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println("--------------httpserverHandler, remote_address " + ctx.channel().remoteAddress() + ", msg_class:" + msg.getClass());
// Thread.sleep(3000); // 休眠5秒鐘,lsof -i:8899 查看TCP連接狀態(tài)
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
URI uri = new URI(httpRequest.uri());
System.out.println("請求方法: " + httpRequest.method() + ", 請求path: " + uri.getPath());
if (FAVICON_ICO.equals(uri.getPath())) {
System.out.println("請求/favicon.ico");
return;
}
// BytBuf:構(gòu)造給客戶端的響應(yīng)內(nèi)容, 制定好編碼
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8);
// 接下構(gòu)造響應(yīng)對象
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
// 調(diào)用flush才會將內(nèi)容真正返回給客戶端
System.out.println("響應(yīng)給客戶端對象: " + response);
ctx.writeAndFlush(response);
ctx.channel().closeFuture();
}
}
//------以下重寫了ChannelInboundHandlerAdapter父類的方法,分析不同事件方法的調(diào)用時機------
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel register invoked");
super.channelRegistered(ctx);
}
@Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel unregister invoked");
super.channelUnregistered(ctx);
}
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel active invoked");
super.channelActive(ctx);
}
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel inactive invoked");
super.channelInactive(ctx);
}
// TODO 這里執(zhí)行了2次,具體有待分析
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel read complete");
super.channelReadComplete(ctx);
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exception caught invoked");
super.exceptionCaught(ctx, cause);
}
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler add invoked");
super.handlerAdded(ctx);
}
}
public class TestNettyServer {
public static void main(String[] args) throws InterruptedException {
// 服務(wù)器端可以理解為while(true){}死循環(huán),去不斷的接受請求連接。 EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 啟動服務(wù)端,這里的處理器都要是多實例的. serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitializer());
System.out.println("服務(wù)端已啟動..");
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
System.out.println("服務(wù)端shutdown");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public static void main(String[] args) throws InterruptedException {
// 服務(wù)器端可以理解為while(true){}死循環(huán),去不斷的接受請求連接。 EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 啟動服務(wù)端,這里的處理器都要是多實例的. serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitializer());
System.out.println("服務(wù)端已啟動..");
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
System.out.println("服務(wù)端shutdown");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}