netty3.2.3源碼分析--服務(wù)器端發(fā)送數(shù)據(jù)分析
Posted on 2010-12-04 14:54 alex_zheng 閱讀(1281) 評論(0) 編輯 收藏 所屬分類: java上一篇分析了服務(wù)器端讀取客戶發(fā)送的數(shù)據(jù),這篇來看服務(wù)器端如何發(fā)送數(shù)據(jù)給客戶端,服務(wù)器往外發(fā)送數(shù)據(jù)是通過downstreamhandler從下到上執(zhí)行
發(fā)送從ChannelFuture future = e.getChannel().write(response)開始執(zhí)行Channels下的
發(fā)送從ChannelFuture future = e.getChannel().write(response)開始執(zhí)行Channels下的
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(
new DownstreamMessageEvent(channel, future, message, remoteAddress));
return future;
}
telentpipeline中最下面一個(gè)downstreamhandler是stringencoder,最后執(zhí)行OneToOneEncoder的handleDownstreamChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(
new DownstreamMessageEvent(channel, future, message, remoteAddress));
return future;
}
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object originalMessage = e.getMessage();
Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
if (originalMessage == encodedMessage) {
ctx.sendDownstream(evt);
} else if (encodedMessage != null) {
//這里寫encode數(shù)據(jù),DefaultChannelPipeline的sendDownstream
write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
}
}
DefaultChannelPipeline的sendDownstream方法ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object originalMessage = e.getMessage();
Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
if (originalMessage == encodedMessage) {
ctx.sendDownstream(evt);
} else if (encodedMessage != null) {
//這里寫encode數(shù)據(jù),DefaultChannelPipeline的sendDownstream
write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
}
}
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
if (prev == null) {
try {
//因?yàn)閟tringencoder是唯一一個(gè)downstreamhandler,這里執(zhí)行NioServerSocketPipelineSink.eventSunk
getSink().eventSunk(DefaultChannelPipeline.this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline.this.sendDownstream(prev, e);
}
}
eventSunk方法會執(zhí)行DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
if (prev == null) {
try {
//因?yàn)閟tringencoder是唯一一個(gè)downstreamhandler,這里執(zhí)行NioServerSocketPipelineSink.eventSunk
getSink().eventSunk(DefaultChannelPipeline.this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline.this.sendDownstream(prev, e);
}
}
private void handleAcceptedSocket(ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future);
}
break;
case BOUND:
case CONNECTED:
if (value == null) {
channel.worker.close(channel, future);
}
break;
case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
//放入writerequestqueue隊(duì)列
boolean offered = channel.writeBuffer.offer(event);
assert offered;
//執(zhí)行nioworker的writeFromUserCode,之后執(zhí)行write0方法
channel.worker.writeFromUserCode(channel);
}
}
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future);
}
break;
case BOUND:
case CONNECTED:
if (value == null) {
channel.worker.close(channel, future);
}
break;
case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
//放入writerequestqueue隊(duì)列
boolean offered = channel.writeBuffer.offer(event);
assert offered;
//執(zhí)行nioworker的writeFromUserCode,之后執(zhí)行write0方法
channel.worker.writeFromUserCode(channel);
}
}
private void write0(NioSocketChannel channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final SocketChannel ch = channel.socket;
//之前將channel放到了該隊(duì)列
final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
//默認(rèn)嘗試16次寫
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf;
if (evt == null) {
//從writebuffer中獲得一個(gè)writeevent
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
buf = channel.currentWriteBuffer;
}
ChannelFuture future = evt.getFuture();
try {
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
//發(fā)送數(shù)據(jù)給客戶端,執(zhí)行PooledSendBuffer.transferTo
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
if (localWrittenBytes > 0) {
// Notify progress listeners if necessary.
future.setProgress(
localWrittenBytes,
buf.writtenBytes(), buf.totalBytes());
}
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
buf = null;
evt = null;
future.setFailure(t);
fireExceptionCaught(channel, t);
if (t instanceof IOException) {
open = false;
close(channel, succeededFuture(channel));
}
}
}
channel.inWriteNowLoop = false;
}
//觸發(fā)寫完成事件,執(zhí)行的是DefaultChannelPipeline的sendUpstream,最后調(diào)用SimpleChannelUpstreamHandler.writeComplete
//pipeline中的upstreamhandler的writeComplete都未重寫,所以只是簡單的傳遞該事件
fireWriteComplete(channel, writtenBytes);
if (open) {
if (addOpWrite) {
setOpWrite(channel);
} else if (removeOpWrite) {
clearOpWrite(channel);
}
}
}
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final SocketChannel ch = channel.socket;
//之前將channel放到了該隊(duì)列
final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
//默認(rèn)嘗試16次寫
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf;
if (evt == null) {
//從writebuffer中獲得一個(gè)writeevent
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
buf = channel.currentWriteBuffer;
}
ChannelFuture future = evt.getFuture();
try {
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
//發(fā)送數(shù)據(jù)給客戶端,執(zhí)行PooledSendBuffer.transferTo
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
if (localWrittenBytes > 0) {
// Notify progress listeners if necessary.
future.setProgress(
localWrittenBytes,
buf.writtenBytes(), buf.totalBytes());
}
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
buf = null;
evt = null;
future.setFailure(t);
fireExceptionCaught(channel, t);
if (t instanceof IOException) {
open = false;
close(channel, succeededFuture(channel));
}
}
}
channel.inWriteNowLoop = false;
}
//觸發(fā)寫完成事件,執(zhí)行的是DefaultChannelPipeline的sendUpstream,最后調(diào)用SimpleChannelUpstreamHandler.writeComplete
//pipeline中的upstreamhandler的writeComplete都未重寫,所以只是簡單的傳遞該事件
fireWriteComplete(channel, writtenBytes);
if (open) {
if (addOpWrite) {
setOpWrite(channel);
} else if (removeOpWrite) {
clearOpWrite(channel);
}
}
}