我们首先给出一个Netty上的一个Example示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class NettyServer { public static void main (String[] args) throws InterruptedException { int cpuSize = Runtime.getRuntime().availableProcessors(); EventLoopGroup bossGroup = new NioEventLoopGroup(1 ); EventLoopGroup workerGroup = new NioEventLoopGroup(cpuSize); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128 ) .option(ChannelOption.TCP_NODELAY, true ) .option(ChannelOption.AUTO_READ, true ) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) { ch.pipeline().addLast(new InHandler()); } }); ChannelFuture f = b.bind(8881 ).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } class InHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete (ChannelHandlerContext ctx) { ctx.flush(); } }
在这个示例中, 我们采用了主从Reactor线程模型, 然后将接受到的数据写回给客户端.
下来我们分析一下ServerBootstrap
的源码. 我们从bind()
方法入手.
由于bind()
最终调用的是父类AbstractBootstrap
的doBind()
方法, 因此我们从父类入手
1 2 3 4 5 6 7 8 9 10 11 12 13 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { ... return promise; } }
接下来我们看一下AbstractBootstrap#initAndRegister()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 final ChannelFuture initAndRegister () { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { } ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null ) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } }
init()
的ServerBootstrap
实现的. 这个方法主要是在NioServerSocketChannel
的pipeline里增加一个ServerBootstrapAcceptor
handler. 这个handler就是用于处理NioMessageUnsafe#read()
方法调用NioServerSocketChannel#doReadMessage()
方法后List<NioSocketChannel>
的消息列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override void init (Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); if (handler() != null ) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel (Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
我们看到ServerBootstrapAcceptor
也是实现自ChannelInboundHandlerAdapter
, 因此它也是一个handler. 在NioMessageUnsafe#read()
方法里会遍历List<NioSocketChannel>
这个消息列表后触发NioServerSocketChannel
的pipeline的fireChannelRead()
方法, 接着就会触发ServerBootstrapAcceptor#channelRead()
,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; ServerBootstrapAcceptor( EventLoopGroup childGroup, ChannelHandler childHandler) { this .childGroup = childGroup; this .childHandler = childHandler; } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); try { childGroup.register(child).addListener(new ChannelFutureListener()); } catch (Throwable t) { } } }
然后我们看一下NioEventLoop
的register()
方法过程. 这个方法调用其实最终调用的是
1 2 3 4 5 6 7 SingleThreadEventLoop @Override public ChannelFuture register (final Channel channel, final ChannelPromise promise) { channel.unsafe().register(this , promise); return promise; }
然后后续调用到了AbstractUnsafe
的register()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Override public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run () { register0(promise); } }); } catch (Throwable t) { } } } private void register0 (ChannelPromise promise) { try { doRegister(); pipeline.fireChannelRegistered(); if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { } }
接着调用AbstractNioChannel
的doRegister()
1 2 3 4 5 6 7 8 9 10 protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0 , this ); return ; } catch (CancelledKeyException e) { } } }
最终我们看到了, 当前JDK里的Channel注册到了EventLoop的IO多路复用器上面
看到这里之后, 我们再接着返回到doBind()
方法继续看doBind0()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static void doBind0 ( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run () { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
我们看到当在bind的时候也是调用的channel的bind(), 真实的bind是在AbstractChannel
里发生的
1 2 3 public ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
然后调用的是DefaultChannelPipeline
的bind()
方法
1 2 3 4 @Override public ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
再具体的bind的话, 就要参考DeaultChannelPipeline
的实现了
最后我们总结一下
首先将NioServerSocketChannel与主Reactor线程池的Selector进行注册绑定
当NioServerSocketChannel接收到网络连接的时候(doReadMessage())会生成一个NioSocketChannel
的消息列表
然后ServerBootstrapAcceptor
负责将NioSocketChannel
与从Reactor的Selector进行注册绑定
最后由从Reactor线程池中的Selector进行IO调度, 读写网络数据