Netty 簡單樣例分析
Netty 是JBoss旗下的io傳輸?shù)目蚣埽胘ava里面的nio來實(shí)現(xiàn)高效,穩(wěn)定的io傳輸。
作為io傳輸,就會有client和server,下面我們看看用netty怎樣寫client和server
Client:
需要做的事情:
1.配置client啟動類
ClientBootstrap bootstrap = new ClientBootstrap(..)
2.根據(jù)不同的協(xié)議或者模式為client啟動類設(shè)置pipelineFactory。
這里telnet pipline Factory 在netty中已經(jīng)存在,所有直接用
bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());
也可以自己定義
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new DiscardClientHandler(firstMessageSize));
}
});
這里DiscardClientHandler 就是自己定義的handler,他需要
public class DiscardServerHandler extends SimpleChannelUpstreamHandler
繼承SimpleChannelUpstreamHandler 來實(shí)現(xiàn)自己的handler。這里DiscardClientHandler
是處理自己的client端的channel,他的
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Server is supposed to send nothing. Therefore, do nothing.
}
可以看到Discard client不需要接受任何信息
3.連接server
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
這里解釋一下channelFuture:
在Netty中所有的io操作都是異步的,這也就是意味任何io訪問,那么就立即返回處理,并且不能確保
返回的數(shù)據(jù)全部完成。因此就出現(xiàn)了channelFuture,channelFuture在傳輸數(shù)據(jù)時(shí)候包括數(shù)據(jù)和狀態(tài)兩個(gè)
部分。他只有Uncompleted和Completed
既然netty io是異步的,那么如何知道channel傳送完成有兩種方式,一種添加監(jiān)聽器
addListener(ChannelFutureListener) 還有一種直接調(diào)用await()方法,這兩種方式
有下面的區(qū)別
監(jiān)聽器:是以事件模式的,因此代碼就需要用事件模式的樣式去寫,相當(dāng)復(fù)雜,但他是non-blocking模式的
性能方面要比await方法好,而且不會產(chǎn)生死鎖情況
await(): 直接方法調(diào)用,使用簡單,但是他是blocking模式,性能方面要弱而且會產(chǎn)生死鎖情況
不要在ChannelHandler 里面調(diào)用await(),這是因?yàn)橥ǔT赾hannelHandler里的event method是被i/o線程調(diào)用的
(除非ChannelPipeline里面有個(gè)ExecutionHandler),那么如果這個(gè)時(shí)候用await就容易產(chǎn)生死鎖。
錯(cuò)誤樣例:
// BAD - NEVER DO THIS
* {@code @Override}
* public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
* if (e.getMessage() instanceof GoodByeMessage) {
* {@link ChannelFuture} future = e.getChannel().close();
* future.awaitUninterruptibly();
* // Perform post-closure operation
* // ...
* }
* }
*
正確樣例:
* // GOOD
* {@code @Override}
* public void messageReceived({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
* if (e.getMessage() instanceof GoodByeMessage) {
* {@link ChannelFuture} future = e.getChannel().close();
* future.addListener(new {@link ChannelFutureListener}() {
* public void operationComplete({@link ChannelFuture} future) {
* // Perform post-closure operation
* // ...
* }
* });
* }
* }
雖然await調(diào)用比較危險(xiǎn),但是你確保不是在一個(gè)i/o 線程中調(diào)用該方法,畢竟await方法還是很簡潔方便的,如果
調(diào)用該方法是在一個(gè)i/o 線程,那么就會拋出 IllegalStateException
await的timeout和i/o timeout區(qū)別
需要注意的是這兩個(gè)timeout是不一樣的, #await(long),#await(long, TimeUnit), #awaitUninterruptibly(long),
#awaitUninterruptibly(long, TimeUnit) 這里面的timeout也i/o timeout 沒有任何關(guān)系,如果io timeout,那么
channelFuture 將被標(biāo)記為completed with failure,而await的timeout 與future完全沒有關(guān)系,只是await動作的
timeout。
錯(cuò)誤代碼
* // BAD - NEVER DO THIS
* {@link ClientBootstrap} b = ...;
* {@link ChannelFuture} f = b.connect(...);
* f.awaitUninterruptibly(10, TimeUnit.SECONDS);
* if (f.isCancelled()) {
* // Connection attempt cancelled by user
* } else if (!f.isSuccess()) {
* // You might get a NullPointerException here because the future
* // might not be completed yet.
* f.getCause().printStackTrace();
* } else {
* // Connection established successfully
* }
*
正確代碼
* // GOOD
* {@link ClientBootstrap} b = ...;
* // Configure the connect timeout option.
* <b>b.setOption("connectTimeoutMillis", 10000);</b>
* {@link ChannelFuture} f = b.connect(...);
* f.awaitUninterruptibly();
*
* // Now we are sure the future is completed.
* assert f.isDone();
*
* if (f.isCancelled()) {
* // Connection attempt cancelled by user
* } else if (!f.isSuccess()) {
* f.getCause().printStackTrace();
* } else {
* // Connection established successfully
* }
4.等待或監(jiān)聽數(shù)據(jù)全部完成
如: future.getChannel().getCloseFuture().awaitUninterruptibly();
5.釋放連接等資源
bootstrap.releaseExternalResources();
Server:
1.配置server
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
2.設(shè)置pipeFactory
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new EchoServerHandler());
}
});
或者
bootstrap.setPipelineFactory(new HttpServerPipelineFactory());
3.綁定sever端端口
bootstrap.bind(new InetSocketAddress(8080));