AbstractChannel AbstractNioChannel AbstractNioByteChannel AbstractNioMessageChannel NioServerSocketChannel NioSocketChannel
`Channel`是Netty网络抽象类. 它的功能包括网络IO的读写,链路的连接和关闭, 通信双方的通信地址等.
下面我们看一下Channel提供的API
parent() : 获取父Channel
unsafe() :
localAddress() : 当前Channel的本地绑定地址
eventLoop() : 当前Channel注册到的EventLoop对象
config() : 获取当前Channel的配置信息
remoteAddress() : 当前Channel通信的远程Socket地址
metadata() : 当前Channel的元数据描述信息,例如TCP参数等等
isOpen() : 判断当初Channel是否已经打开
isWritable() : 当前Channel是否可写
isRegistered() : 是否注册当EventLoop上
isActive() : 当前Channel是否处于激活状态
pipeline() : 当前Channel的ChannelPipeline对象
下面的网络IO操作会直接调用ChannelPipeline里的方法, 在ChannelPipeline里进行事件传播
read() : 从Channel中读取数据到inbound缓冲区
write() : 将消息通过ChannelPipeline写入到目标Channel中
close() : 主动关闭与网络对端的连接
flush() : 将之前写到环形队列里的消息全部写到目标Channel中,发送给网络对端
connect() : 与网络对端发起连接请求(一般由客户端调用这个方法)
bind() :
disconnect() : 请求关闭与网络对端的连接.
AbstractChannel 我们首先看一下AbstractChannel里定义的成员
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();static { CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); } private MessageSizeEstimator.Handle estimatorHandle;private final Channel parent;private final Unsafe unsafe;private final ChannelPipeline pipeline;private final ChannelFuture succeededFuture = new SucceededChannelFuture(this , null );private final VoidChannelPromise voidPromise = new VoidChannelPromise(this , true );private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this , false );private final CloseFuture closeFuture = new CloseFuture(this );private volatile SocketAddress localAddress;private volatile SocketAddress remoteAddress;private volatile EventLoop eventLoop;private volatile boolean registered;private boolean strValActive;private String strVal;
AbstractChannel聚合了所有Channel使用到的能力的对象. 如果某个功能和子类相关则定义抽象方法,由子类去实现.
在这里我们主要关注三个变量
unsafe : 真实网络IO的操作类
pipeline : 当前Channel对应的ChannelPipeline. 负责
eventLoop : 该Channel注册到的EventLoop 在实例化的时候, 会对pipeline和unsafe进行赋值.1 2 3 4 5 protected AbstractChannel (Channel parent) { this .parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this ); }
unsafe实例化由子类实现, 这是因为unsafe的类型是个Unsafe接口, 而且AbstractChannel的内部类AbstractUnsafe是个抽象类, 那么我们就不知道如果要实例化这个类型究竟要使用哪个类型, 因此让AbstractChannel的子类继续实现自己的Unsafe接口的内部类和newUnsafe()方法, unsafe实质类型就有很大的可扩展性
我们看到每一个Channel都有一个自己的pipeline和unsafe. eventLoop是在AbstractUnsafe中register()方法调用时进行赋值的
1 2 3 public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; }
AbstractChannel完成的功能很少, 只是实现了一些初始化的工作, 然后将网络相关的建立,数据读写操作等交给pipeline来完成.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public ChannelFuture disconnect (ChannelPromise promise) { return pipeline.disconnect(promise); } @Override public ChannelFuture close (ChannelPromise promise) { return pipeline.close(promise); } @Override public ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } @Override public ChannelFuture connect (SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); } Override public Channel read () { pipeline.read(); return this ; } @Override public ChannelFuture write (Object msg) { return pipeline.write(msg); }
还提供了一个unsafe()方法
1 2 3 public Unsafe unsafe () { return unsafe; }
我们看一下AbstractUnsafe的定义protected abstract class AbstractUnsafe implements Unsafe, 它是作为一个AbstractChannel的抽象内部类, 这种关系也很容易让AbstractUnsafe访问AbstractChannel定义的一些空实现方法. 例如AbstractUnsafe中调用AbstractChannel的方法如下
beginRead() -> doBeginRead()
doBind() -> doBind()
doDisconnect() -> doDisconnect()()
doClose() -> doClose()
register() -> doRegister()以及调用pipeline的相关方法(fireChannelRegistered()和fireChannelActive())
AbstractNioChannel AbstractNioChannel主要是实现了AbstractChannel的doRegister(), doDeregister(), doBeginRead()方法. 通过下面的变量我们也可以看出这个类主要是为了完成SelectableChannel向Selector的注册功能.
1 2 3 private final SelectableChannel ch;protected final int readInterestOp;volatile SelectionKey selectionKey;
java.nio.channels.ServerSocketChannel和java.nio.channels.SocketChannel都是实现了java.nio.channels.SelectableChannel接口. 而NioSocketChannel和NioServerSocketChannel实现了AbstractNioChannel接口, 因此我们在AbstractNioChannel内定义了一个SelectableChannel成员用于实现ServerSocketChannel和SocketChannel的共用
然后我们看一下doRegister()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0 , this ); return ; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true ; } else { throw e; } } } }
最后我们看一下doBeginRead()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override protected void doBeginRead () throws Exception { if (inputShutdown) { return ; } final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
还记得在AbstractChannel中的AbstractUnsafe吗?里面有个beginRead(), 这个doBeginRead()正是由其调用的.
AbstractNioByteChannel AbstractNioByteChannel内部只有一个Runnable类型的flushTask属性, 它是用来写半包的, 当我们使用到它的时候,我们再具体分析. 我们来重点看一下doWrite()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 protected void doWrite (ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1 ; for (;;) { Object msg = in.current(); if (msg == null ) { clearOpWrite(); break ; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); if (readableBytes == 0 ) { in.remove(); continue ; } boolean setOpWrite = false ; boolean done = false ; long flushedAmount = 0 ; if (writeSpinCount == -1 ) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1 ; i >= 0 ; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0 ) { setOpWrite = true ; break ; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true ; break ; } } in.progress(flushedAmount); if (done) { in.remove(); } else { incompleteWrite(setOpWrite); break ; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean done = region.transfered() >= region.count(); boolean setOpWrite = false ; if (!done) { long flushedAmount = 0 ; if (writeSpinCount == -1 ) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1 ; i >= 0 ; i--) { long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount == 0 ) { setOpWrite = true ; break ; } flushedAmount += localFlushedAmount; if (region.transfered() >= region.count()) { done = true ; break ; } } in.progress(flushedAmount); } if (done) { in.remove(); } else { incompleteWrite(setOpWrite); break ; } } else { throw new Error(); } } }
doWrite()方法是由AbstractUnsafe的flush()调用的. 从AbstractUnsafe我们可以看到每个Unsafe类都有一个ChannelOutboundBuffer属性.
下来我们看一下incompleteWrite()方法实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected final void incompleteWrite (boolean setOpWrite) { if (setOpWrite) { setOpWrite(); } else { Runnable flushTask = this .flushTask; if (flushTask == null ) { flushTask = this .flushTask = new Runnable() { @Override public void run () { flush(); } }; } eventLoop().execute(flushTask); } }
AbstractNioMessageChannel 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 protected void doWrite (ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { Object msg = in.current(); if (msg == null ) { if ((interestOps & SelectionKey.OP_WRITE) != 0 ) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break ; } try { boolean done = false ; for (int i = config().getWriteSpinCount() - 1 ; i >= 0 ; i--) { if (doWriteMessage(msg, in)) { done = true ; break ; } } if (done) { in.remove(); } else { if ((interestOps & SelectionKey.OP_WRITE) == 0 ) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break ; } } catch (IOException e) { if (continueOnWriteError()) { in.remove(e); } else { throw e; } } } }
NioServerSocketChannel NioServerSocketChannel的主要作用是接受客户端连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected int doReadMessages (List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null ) { buf.add(new NioSocketChannel(this , ch)); return 1 ; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket." , t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket." , t2); } } return 0 ; }
这个方法调用主要是由NioMessageUnsafe的read()方法调用
NioSocketChannel