Netty(一) — Channel和Unsafe

​ 基于 Netty 4.1.34,从 Channel 的整体架构入手,分析了 Channel 与 Unsafe 的配合机制,以及底层对应的 NIO 实现类。在分析过程中也穿插了对关键方法的说明,最后总结了 Netty Channel 相比原生 JDK Channel 在使用体验和扩展性上的优势。

NIO基础

Origin Channel OP_ACCEPT OP_CONNECT OP_WRITE OP_READ
client SocketChannel Y Y Y
server ServerSocketChannel Y
server SocketChannel Y Y
  • OP_ACCEPT:当收到一个客户端的连接请求时,该操作就绪。这是ServerSocketChannel上唯一有效的操作。

  • OP_CONNECT:只有客户端SocketChannel会注册该操作,当客户端调用SocketChannel.connect()时,该操作会就绪

  • OP_READ:该操作对客户端和服务端的SocketChannel都有效,当OS的读缓冲区中有数据可读时,该操作就绪

  • OP_WRITE:该操作对客户端和服务端的SocketChannel都有效,当OS的写缓冲区中有空闲的空间时(大部分时候都有),该操作就绪

    ​ OP_WRITE 事件相对特殊,一般情况,不应该注册OP_WRITE事件OP_WRITE的就绪条件为操作系统内核缓冲区有空闲空间(OP_WRITE事件是在Socket发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT时发生),而写缓冲区绝大部分事件都是有空闲空间的,所以当你注册写事件后,写操作一直就是就绪的,这样会导致Selector处理线程会占用整个CPU的资源。所以OP_WRITE的使用方式是当你确实有数据写入时再注册OP_WRITE事件,并且在写完以后马上取消注册

Channel

io.netty.channel.Channel 是 Netty 中抽象出来的顶层网络连接接口,表示一个客户端连接一个服务端监听通道(ServerSocket)。Netty 中所有的连接类型( NioSocketChannelNioServerSocketChannel等)都会实现该接口

​ Channel提供了访问其生命周期状态绑定地址所属线程模型EventLoop)、事件处理链ChannelPipeline)等通用能力。

​ 并且其还实现了ChannelOutboundInvoker,表示它具有出站事件驱动能力。例如 bind()connect()write()flush() 等方法,实际调用时会从 pipeline 的尾部(TailContext)开始,逐步向前传递,最终由 HeadContext 使用 Channel.Unsafe 完成真正的底层 IO 操作。

核心接口


public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

    // 当前channel的唯一id
    ChannelId id();

    // 返回分配给 当前Channel 的 EventLoop(事件循环线程)
    EventLoop eventLoop();

    // 父channel,即创建当前Channel的Channel
    // 对于客户端连接(如 NioSocketChannel),其 parent 通常是 NioServerSocketChannel(服务端监听连接)
    // 而顶层的Channel则为null(NioServerSocketChannel)
    Channel parent();

    // Channel 的配置信息,包含连接参数(如 TCP_NODELAY、SO_KEEPALIVE 等)
    ChannelConfig config();

    boolean isOpen();

    // 对于NIO来说,即是否把jdk的Channel注册到Selector上
    boolean isRegistered();

    // 对于ServerSocketChannel,表示绑定了地址和端口。
    // 对于SocketChannel,代表已经连接了服务端
    boolean isActive();

    // bind的本地SokcetAddress
    SocketAddress localAddress();

    // 返回远程的 SocketAddress。
    // 对于server端来说,没有远端地址,返回null
    SocketAddress remoteAddress();

    // 一个固定的 ChannelFuture,用于监听当前 Channel 的close事件
    // 一旦 Channel close了,该 Future 会完成(成功),可注册监听器处理资源释放等逻辑。
    ChannelFuture closeFuture();

    // 真的的IO操作接口
    Unsafe unsafe();

    // 分配给 当前Channel 的 ChannelPipeline
    ChannelPipeline pipeline();
}

AbstractChannel

​ 是所有 Netty Channel实现类的公共基类,像 NioSocketChannelEpollSocketChannelKQueueSocketChannel 都是基于它扩展的。主要负责抽象出所有 Channel 通用的字段和功能,避免重复代码,也是 ChannelOutboundInvoker 接口的默认实现(转发给Pipeline处理)。其构造方法和常见公共字段如下:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    // 创建Unsafe
    unsafe = newUnsafe();
    // 创建DefaultChannelPipeline
    pipeline = newChannelPipeline();
}

private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
// 内部无回调的 IO 操作
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
// 监听 Channel 关闭状态的 future
private final CloseFuture closeFuture = new CloseFuture(this);
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
// 是否已注册到 Selector
private volatile boolean registered;

AbstractNioChannel

​ 这是所有基于 Java NIO 的 Netty Channel 的抽象基类,像 NioSocketChannelNioServerSocketChannel 都基于它实现。配置Channel非阻塞、注册Selector、添加感兴趣的事件等都在里面完成的。其主要字段和方法如下

public abstract class AbstractNioChannel extends AbstractChannel {

    // java原生的channel:SocketChannel和ServerSocketChannel
    private final SelectableChannel ch;
    // 感兴趣的事件,Server是OP_ACCEPT,SocketChannel是OP_READ
    protected final int readInterestOp;
    // jdk的channel注册到selector上返回的SelectionKey,用于后续的事件控制
    volatile SelectionKey selectionKey;
    boolean readPending;
    // connect 操作的 promise,即connect的结果回调
    private ChannelPromise connectPromise;
    // connect操作的超时定时器
    private ScheduledFuture<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            // 在这里设置为非阻塞模式
            ch.configureBlocking(false);
        } catch (IOException e) {
            // ... 省略异常
        }
    }

    /**
     * 仅注册到Selector上,不监听任何事件
     */
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 将java原生的ServerSocketChannel注册到selector上
                // 初始没有感兴趣的事件,后续通过 doBeginRead 等方法设置
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) { // 第一次注册抛出异常,删除当前的selectionKey,继续注册
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is
                    // still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

    /*
     * 真正操作Channel.read() 或 ChannelHandlerContext.read()时才注册注册事件
     * SocketChannel为OP_READ,ServerSocketChannel为OP_ACCEPT
     */
    @Override
    protected void doBeginRead() throws Exception {
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            // Channel 已失效,忽略
            return;
        }
        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            // 如果当前未监听感兴趣的事件,则添加(避免重复设置)
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
}
NioSocketChannel

​ 是 Netty 中基于 NIO 实现的 TCP 客户端连接类,最终负责与远程建立连接并进行读写通信。它继承自 AbstractNioByteChannel,本质上是对 JDK 的 SocketChannel 的一层封装。

该类主要通过实现 doReadBytesdoWriteBytesdoWriteFileRegion 这三个模板方法,完成与底层 java.nio.channels.SocketChannel 的数据交互,使用 Netty 的 ByteBuf 进行读取和写入

  • doReadBytes(ByteBuf)从 JDK Channel 中读取数据填充到 ByteBuf
  • doWriteBytes(ByteBuf)从 ByteBuf 中取出数据写入到底层 Channel
  • doWriteFileRegion(FileRegion):支持将文件直接写入到 SocketChannel,提高传输效率(zero-copy)。

此外,它还负责实现bind和connect操作、Channel的close、本地和远程地址获取等功能

NioServerSocketChannel

​ 是 Netty 中基于 NIO 实现的服务端监听通道类,负责处理 TCP 连接的接入。它继承自 AbstractNioMessageChannel,封装了 JDK 的 ServerSocketChannel

​ 该类的核心职责非常单一:监听 OP_ACCEPT 事件并接受新连接。当有客户端连接到来时,会触发 doReadMessages() 方法:

  • doReadMessages(List<Object> buf)接受底层的 JDK SocketChannel,并将其封装成 Netty 的 NioSocketChannel 实例添加到 buf 中,后续由 Acceptor 继续处理

ServerSocketChannel不会参与具体的数据发送,因此像 connect()write() 等出站操作在该类中是不支持的(抛 UnsupportedOperationException

Unsafe

​ 首先其命名和JDK的Unsafe一样,表示它是不安全的,其作为io.netty.channel.Channel的内部接口,同时它的所有实现类也在对应的Channel实现类里,且都是private级别的,也表示了Netty不希望这个类在其他地方使用
JDK的Unsafe是用来操作内存中的指针,而这个Unsafe是用来处理IO的。Netty将处理底层IO的细节都封装在里面,tail和head这两个端点Handler的功能就依赖Unsafe来完成read、connect、write、flush等IO操作

NioByteUnsafe

NioSocketChannel 的专属 Unsafe 实现,负责与底层 SocketChannel 的读写交互和 Netty 的事件触发

​ 重点关注其 read() 方法,该方法会把数据读进 ByteBuf,期间可能多次触发 channelRead(网络可能拆包),用来处理解码和业务逻辑。读完后再触发一次 channelReadComplete

protected class NioByteUnsafe extends AbstractNioUnsafe {
    /**
     * NioSocketChannel 的实际读取逻辑,从底层 Channel 中读取字节数据并触发后续处理流程。
     */
    @Override
    public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                // 由于我们不知道当前Channel能读取到多少字节的数据,所以需要分配一个估值容量的ByteBuf(容量过大会浪费,容量过小会读取多次)
                // 根据配置参数分配一个具有特定容量ByteBuf,当这个ByteBuf容量小于Channel中的数据时,就会触发多次读取
                byteBuf = allocHandle.allocate(allocator);
                // 将Channel中的数据读取到ByteBuf中,并且设置读取到的字节数大小
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) { // 没有读取到数据
                    // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    // 读取的字节数小于0,通常表示连接已关闭(EOF)
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }
                // 读取数据的次数 + 1
                allocHandle.incMessagesRead(1);
                readPending = false;
                // 传入读取到的ByteBuf作为参数,触发ChannelRead事件(解码、处理业务逻辑等操作)
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
                // 判断是否应该继续从Channel中读取数据
                // 当上次读取到的数据容量等于我们准备的ByteBuf容量时,代表很大可能还没读取完,返回true继续读取数据
            } while (allocHandle.continueReading());
            // 这次Channel发送的数据已被读取完毕,触发一次readComplete事件
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // 没有待处理的read请求 且 当前不再自动读取,则取消 READ 事件监听
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}
 

NioMessageUnsafe

​ 这是 NioServerSocketChannel 专用的 Unsafe 实现类,主要负责处理底层 ServerSocketChannel 的 OP_ACCEPT 事件。其read的逻辑如下:

  1. 底层 ServerSocketChannel 接受连接

  2. 每个连接创建对应的 NioSocketChannel 实例

  3. 通过 pipeline.fireChannelRead 触发事件,把每个NioSocketChannel交给NioServerSocketChannel中唯一的 ServerBootstrapAcceptor 这个Handler处理

    ServerBootstrapAcceptor 会将该 NioSocketChannel 绑定自定义 handler,并注册到对应的 EventLoop

  4. 最后触发服务端的readComplete事件(里面啥都没做)

总结

通过上面Channel和Unsafe的分析,总结一下相比普通的JDK Channel,Netty的Channel提供了如下特性

  1. EventLoop线程的绑定:Channel的所有IO操作都在该EventLoop绑定的线程中执行,不会有并发问题
  2. 事件分发机制:Channel的IO操作基本都会被包装为Netty事件,由对应的ChannelPipeline进行分发
  3. 高级的异步编程:Channel的IO操作都是返回的异步结果,方便链式调用与回调监听(就算内部是同步的也是封装成异步结果,保持风格的统一)
  4. 接口的统一抽象:为了更大力度的能复用代码和最小化代价切换实现类,Channel进行了分层抽象:通用顶层接口和基类 -> 具体的传输层接口和基类(NIO、Epoll、KQueue)-> 具体的 server/客户端 Channel 实现类
  5. 安全考虑:将操作IO的能力抽象到了Unsafe这个内部类中,算是一种变相的警告