AbstractChannel AbstractNioChannel AbstractNioByteChannel AbstractNioMessageChannel NioServerSocketChannel NioSocketChannel
`Channel`是Netty网络抽象类. 它的功能包括网络IO的读写,链路的连接和关闭, 通信双方的通信地址等.
下面我们看一下Channel提供的API
parent()
: 获取父Channel
unsafe()
:
localAddress()
: 当前Channel的本地绑定地址
eventLoop()
: 当前Channel注册到的EventLoop对象
config()
: 获取当前Channel的配置信息
remoteAddress()
: 当前Channel通信的远程Socket地址
metadata()
: 当前Channel的元数据描述信息,例如TCP参数等等
isOpen()
: 判断当初Channel是否已经打开
isWritable()
: 当前Channel是否可写
isRegistered()
: 是否注册当EventLoop上
isActive()
: 当前Channel是否处于激活状态
pipeline()
: 当前Channel的ChannelPipeline对象
下面的网络IO操作会直接调用ChannelPipeline里的方法, 在ChannelPipeline里进行事件传播
read()
: 从Channel中读取数据到inbound缓冲区
write()
: 将消息通过ChannelPipeline写入到目标Channel中
close()
: 主动关闭与网络对端的连接
flush()
: 将之前写到环形队列里的消息全部写到目标Channel中,发送给网络对端
connect()
: 与网络对端发起连接请求(一般由客户端调用这个方法)
bind()
:
disconnect()
: 请求关闭与网络对端的连接.
AbstractChannel 我们首先看一下AbstractChannel
里定义的成员
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();static { CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); } private MessageSizeEstimator.Handle estimatorHandle;private final Channel parent;private final Unsafe unsafe;private final ChannelPipeline pipeline;private final ChannelFuture succeededFuture = new SucceededChannelFuture(this , null );private final VoidChannelPromise voidPromise = new VoidChannelPromise(this , true );private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this , false );private final CloseFuture closeFuture = new CloseFuture(this );private volatile SocketAddress localAddress;private volatile SocketAddress remoteAddress;private volatile EventLoop eventLoop;private volatile boolean registered;private boolean strValActive;private String strVal;
AbstractChannel
聚合了所有Channel使用到的能力的对象. 如果某个功能和子类相关则定义抽象方法,由子类去实现.
在这里我们主要关注三个变量
unsafe
: 真实网络IO的操作类
pipeline
: 当前Channel对应的ChannelPipeline. 负责
eventLoop
: 该Channel注册到的EventLoop 在实例化的时候, 会对pipeline
和unsafe
进行赋值.1 2 3 4 5 protected AbstractChannel (Channel parent) { this .parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this ); }
unsafe实例化由子类实现, 这是因为unsafe的类型是个Unsafe接口, 而且AbstractChannel的内部类AbstractUnsafe是个抽象类, 那么我们就不知道如果要实例化这个类型究竟要使用哪个类型, 因此让AbstractChannel的子类继续实现自己的Unsafe接口的内部类和newUnsafe()方法, unsafe实质类型就有很大的可扩展性
我们看到每一个Channel都有一个自己的pipeline
和unsafe
. eventLoop
是在AbstractUnsafe
中register()
方法调用时进行赋值的
1 2 3 public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; }
AbstractChannel
完成的功能很少, 只是实现了一些初始化的工作, 然后将网络相关的建立,数据读写操作等交给pipeline
来完成.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public ChannelFuture disconnect (ChannelPromise promise) { return pipeline.disconnect(promise); } @Override public ChannelFuture close (ChannelPromise promise) { return pipeline.close(promise); } @Override public ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } @Override public ChannelFuture connect (SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); } Override public Channel read () { pipeline.read(); return this ; } @Override public ChannelFuture write (Object msg) { return pipeline.write(msg); }
还提供了一个unsafe()
方法
1 2 3 public Unsafe unsafe () { return unsafe; }
我们看一下AbstractUnsafe
的定义protected abstract class AbstractUnsafe implements Unsafe
, 它是作为一个AbstractChannel
的抽象内部类, 这种关系也很容易让AbstractUnsafe
访问AbstractChannel
定义的一些空实现方法. 例如AbstractUnsafe
中调用AbstractChannel
的方法如下
beginRead()
-> doBeginRead()
doBind()
-> doBind()
doDisconnect()
-> doDisconnect()()
doClose()
-> doClose()
register()
-> doRegister()
以及调用pipeline的相关方法(fireChannelRegistered()和fireChannelActive())
AbstractNioChannel AbstractNioChannel
主要是实现了AbstractChannel
的doRegister(), doDeregister(), doBeginRead()
方法. 通过下面的变量我们也可以看出这个类主要是为了完成SelectableChannel
向Selector
的注册功能.
1 2 3 private final SelectableChannel ch;protected final int readInterestOp;volatile SelectionKey selectionKey;
java.nio.channels.ServerSocketChannel
和java.nio.channels.SocketChannel
都是实现了java.nio.channels.SelectableChannel
接口. 而NioSocketChannel
和NioServerSocketChannel
实现了AbstractNioChannel
接口, 因此我们在AbstractNioChannel
内定义了一个SelectableChannel
成员用于实现ServerSocketChannel
和SocketChannel
的共用
然后我们看一下doRegister()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0 , this ); return ; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true ; } else { throw e; } } } }
最后我们看一下doBeginRead()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override protected void doBeginRead () throws Exception { if (inputShutdown) { return ; } final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
还记得在AbstractChannel
中的AbstractUnsafe
吗?里面有个beginRead()
, 这个doBeginRead()
正是由其调用的.
AbstractNioByteChannel AbstractNioByteChannel
内部只有一个Runnable
类型的flushTask
属性, 它是用来写半包的, 当我们使用到它的时候,我们再具体分析. 我们来重点看一下doWrite()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 protected void doWrite (ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1 ; for (;;) { Object msg = in.current(); if (msg == null ) { clearOpWrite(); break ; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); if (readableBytes == 0 ) { in.remove(); continue ; } boolean setOpWrite = false ; boolean done = false ; long flushedAmount = 0 ; if (writeSpinCount == -1 ) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1 ; i >= 0 ; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0 ) { setOpWrite = true ; break ; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true ; break ; } } in.progress(flushedAmount); if (done) { in.remove(); } else { incompleteWrite(setOpWrite); break ; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean done = region.transfered() >= region.count(); boolean setOpWrite = false ; if (!done) { long flushedAmount = 0 ; if (writeSpinCount == -1 ) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1 ; i >= 0 ; i--) { long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount == 0 ) { setOpWrite = true ; break ; } flushedAmount += localFlushedAmount; if (region.transfered() >= region.count()) { done = true ; break ; } } in.progress(flushedAmount); } if (done) { in.remove(); } else { incompleteWrite(setOpWrite); break ; } } else { throw new Error(); } } }
doWrite()
方法是由AbstractUnsafe
的flush()
调用的. 从AbstractUnsafe
我们可以看到每个Unsafe类都有一个ChannelOutboundBuffer
属性.
下来我们看一下incompleteWrite()
方法实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected final void incompleteWrite (boolean setOpWrite) { if (setOpWrite) { setOpWrite(); } else { Runnable flushTask = this .flushTask; if (flushTask == null ) { flushTask = this .flushTask = new Runnable() { @Override public void run () { flush(); } }; } eventLoop().execute(flushTask); } }
AbstractNioMessageChannel 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 protected void doWrite (ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { Object msg = in.current(); if (msg == null ) { if ((interestOps & SelectionKey.OP_WRITE) != 0 ) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break ; } try { boolean done = false ; for (int i = config().getWriteSpinCount() - 1 ; i >= 0 ; i--) { if (doWriteMessage(msg, in)) { done = true ; break ; } } if (done) { in.remove(); } else { if ((interestOps & SelectionKey.OP_WRITE) == 0 ) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break ; } } catch (IOException e) { if (continueOnWriteError()) { in.remove(e); } else { throw e; } } } }
NioServerSocketChannel NioServerSocketChannel
的主要作用是接受客户端连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected int doReadMessages (List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null ) { buf.add(new NioSocketChannel(this , ch)); return 1 ; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket." , t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket." , t2); } } return 0 ; }
这个方法调用主要是由NioMessageUnsafe
的read()
方法调用
NioSocketChannel