beginRead() : Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the {@link ChannelPipeline}. If there’s already a pending read operation, this method does nothing.
write() : 调度一个写操作
flush() : 通过write()将全部的写操作进行调用
outboundBuffer() : Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
@Override publicvoidread(){ final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; }
// stop reading if (!config.isAutoRead()) { break; }
if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } // 如果仍然有未读数据的话, 则继续读取 } while (++ messages < maxMessagesPerRead);
if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
privatefinal List<Object> readBuf = new ArrayList<Object>();
@Override publicvoidread(){ asserteventLoop().inEventLoop(); final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; }
if (exception != null) { if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) { // ServerChannel should not be closed even on IOException because it can often continue // accepting incoming connections. (e.g. too many open files) closed = !(AbstractNioMessageChannel.thisinstanceof ServerChannel); }
pipeline.fireExceptionCaught(exception); }
if (closed) { if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
static { List<Integer> sizeTable = new ArrayList<Integer>(); // 当消息小于512的时候, 每次步进16字节, 也就是预测下个消息比当前消息仍然大16字节 for (int i = 16; i < 512; i += 16) { sizeTable.add(i); }
// 当消息大小大于512的时候, 则采取倍增的方式 for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); }
SIZE_TABLE = newint[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } }