Netty(一) — Channel和Unsafe
基于 Netty 4.1.34,从 Channel 的整体架构入手,分析了 Channel 与 Unsafe 的配合机制,以及底层对应的 NIO 实现类。在分析过程中也穿插了对关键方法的说明,最后总结了 Netty Channel 相比原生 JDK Channel 在使用体验和扩展性上的优势。
NIO基础
| Origin | Channel | OP_ACCEPT | OP_CONNECT | OP_WRITE | OP_READ |
|---|---|---|---|---|---|
| client | SocketChannel | Y | Y | Y | |
| server | ServerSocketChannel | Y | |||
| server | SocketChannel | Y | Y |
OP_ACCEPT:当收到一个客户端的连接请求时,该操作就绪。这是ServerSocketChannel上唯一有效的操作。OP_CONNECT:只有客户端SocketChannel会注册该操作,当客户端调用SocketChannel.connect()时,该操作会就绪OP_READ:该操作对客户端和服务端的SocketChannel都有效,当OS的读缓冲区中有数据可读时,该操作就绪OP_WRITE:该操作对客户端和服务端的SocketChannel都有效,当OS的写缓冲区中有空闲的空间时(大部分时候都有),该操作就绪 OP_WRITE 事件相对特殊,一般情况,不应该注册
OP_WRITE事件,OP_WRITE的就绪条件为操作系统内核缓冲区有空闲空间(OP_WRITE事件是在Socket发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT时发生),而写缓冲区绝大部分事件都是有空闲空间的,所以当你注册写事件后,写操作一直就是就绪的,这样会导致Selector处理线程会占用整个CPU的资源。所以OP_WRITE的使用方式是当你确实有数据写入时再注册OP_WRITE事件,并且在写完以后马上取消注册
Channel
io.netty.channel.Channel 是 Netty 中抽象出来的顶层网络连接接口,表示一个客户端连接或一个服务端监听通道(ServerSocket)。Netty 中所有的连接类型( NioSocketChannel、NioServerSocketChannel等)都会实现该接口。
Channel提供了访问其生命周期状态、绑定地址、所属线程模型(EventLoop)、事件处理链(ChannelPipeline)等通用能力。
并且其还实现了ChannelOutboundInvoker,表示它具有出站事件驱动能力。例如 bind()、connect()、write()、flush() 等方法,实际调用时会从 pipeline 的尾部(TailContext)开始,逐步向前传递,最终由 HeadContext 使用 Channel.Unsafe 完成真正的底层 IO 操作。
核心接口
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
// 当前channel的唯一id
ChannelId id();
// 返回分配给 当前Channel 的 EventLoop(事件循环线程)
EventLoop eventLoop();
// 父channel,即创建当前Channel的Channel
// 对于客户端连接(如 NioSocketChannel),其 parent 通常是 NioServerSocketChannel(服务端监听连接)
// 而顶层的Channel则为null(NioServerSocketChannel)
Channel parent();
// Channel 的配置信息,包含连接参数(如 TCP_NODELAY、SO_KEEPALIVE 等)
ChannelConfig config();
boolean isOpen();
// 对于NIO来说,即是否把jdk的Channel注册到Selector上
boolean isRegistered();
// 对于ServerSocketChannel,表示绑定了地址和端口。
// 对于SocketChannel,代表已经连接了服务端
boolean isActive();
// bind的本地SokcetAddress
SocketAddress localAddress();
// 返回远程的 SocketAddress。
// 对于server端来说,没有远端地址,返回null
SocketAddress remoteAddress();
// 一个固定的 ChannelFuture,用于监听当前 Channel 的close事件
// 一旦 Channel close了,该 Future 会完成(成功),可注册监听器处理资源释放等逻辑。
ChannelFuture closeFuture();
// 真的的IO操作接口
Unsafe unsafe();
// 分配给 当前Channel 的 ChannelPipeline
ChannelPipeline pipeline();
}
AbstractChannel
是所有 Netty Channel实现类的公共基类,像 NioSocketChannel、EpollSocketChannel、KQueueSocketChannel 都是基于它扩展的。主要负责抽象出所有 Channel 通用的字段和功能,避免重复代码,也是 ChannelOutboundInvoker 接口的默认实现(转发给Pipeline处理)。其构造方法和常见公共字段如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
// 创建Unsafe
unsafe = newUnsafe();
// 创建DefaultChannelPipeline
pipeline = newChannelPipeline();
}
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
// 内部无回调的 IO 操作
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
// 监听 Channel 关闭状态的 future
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
// 是否已注册到 Selector
private volatile boolean registered;
AbstractNioChannel
这是所有基于 Java NIO 的 Netty Channel 的抽象基类,像 NioSocketChannel、NioServerSocketChannel 都基于它实现。配置Channel非阻塞、注册Selector、添加感兴趣的事件等都在里面完成的。其主要字段和方法如下
public abstract class AbstractNioChannel extends AbstractChannel {
// java原生的channel:SocketChannel和ServerSocketChannel
private final SelectableChannel ch;
// 感兴趣的事件,Server是OP_ACCEPT,SocketChannel是OP_READ
protected final int readInterestOp;
// jdk的channel注册到selector上返回的SelectionKey,用于后续的事件控制
volatile SelectionKey selectionKey;
boolean readPending;
// connect 操作的 promise,即connect的结果回调
private ChannelPromise connectPromise;
// connect操作的超时定时器
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 在这里设置为非阻塞模式
ch.configureBlocking(false);
} catch (IOException e) {
// ... 省略异常
}
}
/**
* 仅注册到Selector上,不监听任何事件
*/
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将java原生的ServerSocketChannel注册到selector上
// 初始没有感兴趣的事件,后续通过 doBeginRead 等方法设置
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) { // 第一次注册抛出异常,删除当前的selectionKey,继续注册
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is
// still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
/*
* 真正操作Channel.read() 或 ChannelHandlerContext.read()时才注册注册事件
* SocketChannel为OP_READ,ServerSocketChannel为OP_ACCEPT
*/
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
// Channel 已失效,忽略
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 如果当前未监听感兴趣的事件,则添加(避免重复设置)
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
NioSocketChannel
是 Netty 中基于 NIO 实现的 TCP 客户端连接类,最终负责与远程建立连接并进行读写通信。它继承自 AbstractNioByteChannel,本质上是对 JDK 的 SocketChannel 的一层封装。
该类主要通过实现 doReadBytes、doWriteBytes 和 doWriteFileRegion 这三个模板方法,完成与底层 java.nio.channels.SocketChannel 的数据交互,使用 Netty 的 ByteBuf 进行读取和写入:
doReadBytes(ByteBuf):从 JDK Channel 中读取数据填充到 ByteBufdoWriteBytes(ByteBuf):从 ByteBuf 中取出数据写入到底层 ChanneldoWriteFileRegion(FileRegion):支持将文件直接写入到 SocketChannel,提高传输效率(zero-copy)。
此外,它还负责实现bind和connect操作、Channel的close、本地和远程地址获取等功能
NioServerSocketChannel
是 Netty 中基于 NIO 实现的服务端监听通道类,负责处理 TCP 连接的接入。它继承自 AbstractNioMessageChannel,封装了 JDK 的 ServerSocketChannel。
该类的核心职责非常单一:监听 OP_ACCEPT 事件并接受新连接。当有客户端连接到来时,会触发 doReadMessages() 方法:
doReadMessages(List<Object> buf):接受底层的 JDKSocketChannel,并将其封装成 Netty 的NioSocketChannel实例添加到buf中,后续由 Acceptor 继续处理。
ServerSocketChannel不会参与具体的数据发送,因此像 connect()、write() 等出站操作在该类中是不支持的(抛 UnsupportedOperationException)
Unsafe
首先其命名和JDK的Unsafe一样,表示它是不安全的,其作为io.netty.channel.Channel的内部接口,同时它的所有实现类也在对应的Channel实现类里,且都是private级别的,也表示了Netty不希望这个类在其他地方使用
JDK的Unsafe是用来操作内存中的指针,而这个Unsafe是用来处理IO的。Netty将处理底层IO的细节都封装在里面,tail和head这两个端点Handler的功能就依赖Unsafe来完成read、connect、write、flush等IO操作
NioByteUnsafe
NioSocketChannel 的专属 Unsafe 实现,负责与底层 SocketChannel 的读写交互和 Netty 的事件触发
重点关注其 read() 方法,该方法会把数据读进 ByteBuf,期间可能多次触发 channelRead(网络可能拆包),用来处理解码和业务逻辑。读完后再触发一次 channelReadComplete。
protected class NioByteUnsafe extends AbstractNioUnsafe {
/**
* NioSocketChannel 的实际读取逻辑,从底层 Channel 中读取字节数据并触发后续处理流程。
*/
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 由于我们不知道当前Channel能读取到多少字节的数据,所以需要分配一个估值容量的ByteBuf(容量过大会浪费,容量过小会读取多次)
// 根据配置参数分配一个具有特定容量ByteBuf,当这个ByteBuf容量小于Channel中的数据时,就会触发多次读取
byteBuf = allocHandle.allocate(allocator);
// 将Channel中的数据读取到ByteBuf中,并且设置读取到的字节数大小
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) { // 没有读取到数据
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
// 读取的字节数小于0,通常表示连接已关闭(EOF)
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 读取数据的次数 + 1
allocHandle.incMessagesRead(1);
readPending = false;
// 传入读取到的ByteBuf作为参数,触发ChannelRead事件(解码、处理业务逻辑等操作)
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
// 判断是否应该继续从Channel中读取数据
// 当上次读取到的数据容量等于我们准备的ByteBuf容量时,代表很大可能还没读取完,返回true继续读取数据
} while (allocHandle.continueReading());
// 这次Channel发送的数据已被读取完毕,触发一次readComplete事件
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// 没有待处理的read请求 且 当前不再自动读取,则取消 READ 事件监听
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
NioMessageUnsafe
这是 NioServerSocketChannel 专用的 Unsafe 实现类,主要负责处理底层 ServerSocketChannel 的 OP_ACCEPT 事件。其read的逻辑如下:
从底层
ServerSocketChannel接受连接为每个连接创建对应的
NioSocketChannel实例通过
pipeline.fireChannelRead触发事件,把每个NioSocketChannel交给NioServerSocketChannel中唯一的ServerBootstrapAcceptor这个Handler处理
ServerBootstrapAcceptor会将该NioSocketChannel绑定自定义 handler,并注册到对应的EventLoop上最后触发服务端的readComplete事件(里面啥都没做)
总结
通过上面Channel和Unsafe的分析,总结一下相比普通的JDK Channel,Netty的Channel提供了如下特性
- EventLoop线程的绑定:Channel的所有IO操作都在该EventLoop绑定的线程中执行,不会有并发问题
- 事件分发机制:Channel的IO操作基本都会被包装为Netty事件,由对应的ChannelPipeline进行分发
- 更高级的异步编程:Channel的IO操作都是返回的异步结果,方便链式调用与回调监听(就算内部是同步的也是封装成异步结果,保持风格的统一)
- 接口的统一抽象:为了更大力度的能复用代码和最小化代价切换实现类,Channel进行了分层抽象:通用顶层接口和基类 -> 具体的传输层接口和基类(NIO、Epoll、KQueue)-> 具体的 server/客户端 Channel 实现类
- 安全考虑:将操作IO的能力抽象到了Unsafe这个内部类中,算是一种变相的警告