Seata — 核心组件和启动流程

​ 基于 Seata 1.7.1 源码,这篇文章算是 AT 模式源码解析的前置内容。文章先从 自动配置类 切入,梳理了客户端核心组件(TM、RM)的职责和初始化流程,接着分析 AT 模式下 DataSource 代理的实现原理,最后再简单介绍了服务端的启动过程以及 TC 的创建逻辑

客户端

​ 当项目引入 seata-spring-boot-starter 后,当前模块就具备了分布式事务的能力,可以看作是 Seata 客户端。这个 Starter 主要加载 SeataAutoConfigurationSeataDataSourceAutoConfiguration 两个自动配置类,用于提供分布式事务相关的核心功能

SeataAutoConfiguration

SeataAutoConfiguration 是客户端的核心配置类,所有事务模式都依赖它。其中注册了一个关键 GlobalTransactionScanner 这个具备代理功能的Bean,它承担两个核心职责

  • 对注解 @GlobalTransactional@GlobalLock 的方法进行代理
  • 完成 TM 和 RM 的初始化

注解代理

GlobalTransactionScanner 继承了 AbstractAutoProxyCreator,因此它具备 Spring 的 自动代理功能。重点看它的 wrapIfNecessary 方法,有两个关键点:

  1. 代理逻辑判断

    如果Class或Method上有 @GlobalTransactional@GlobalLock,就会创建一个代理,并加入 GlobalTransactionalInterceptor 作为拦截器

  2. Advisor 插入顺序

    ​ 调用 GlobalTransactionScanner#findAddSeataAdvisorPosition 来确定 Advisor 的位置

    ​ 这里很重要,因为 GlobalTransactionalInterceptor 的位置是 SeataInterceptorPosition.BeforeTransaction,意味着它必须在 Spring 的事务切面(@Transactional之前执行
    ​ 因为当 @GlobalTransactional@Transactional 同时使用时,Seata 的全局事务逻辑需要先于本地事务,否则全局事务无法覆盖本地事务的执行范围

客户端初始化

​ Seata 客户端和服务端之间基于 Netty 长连接 进行通信。为了完成分布式事务协调,客户端抽象出两个核心角色:

  • TM(Transaction Manager):负责全局事务的开始、提交、回滚
  • RM(Resource Manager):负责分支事务的注册、回滚、提交

​ 这两个角色分别由 TMClientRMClient 管理,并在 GlobalTransactionScanner#afterPropertiesSet 中完成初始化。初始化过程中,每个角色会创建独立的 Netty 客户端(Bootstrap),其连接由 NettyClientChannelManager 统一管理,底层通过 Apache Commons Pool 实现连接池化,且按照 serverAddress(ip:port) 维度复用 Channel,确保一个服务端节点仅维护一条长连接,从而兼顾性能与资源利用率

TMClient
TM(Transaction Manager)

​ 事务管理器,接口为TransactionManager,可以理解成分布式事务版的 Spring PlatformTransactionManager。它负责发起、提交、回滚全局事务。区别是 Spring 靠数据库事务,Seata 需要服务端(协调者)来兜底。且只有全局事务的发起者才会触发 TM 的 begin / commit / rollback。代码里 @GlobalTransactional 切面就是通过 GlobalTransactionalInterceptor 调用 TM,默认实现是 DefaultTransactionManager

public interface TransactionManager {

    String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException;

    GlobalStatus commit(String xid) throws TransactionException;

    GlobalStatus rollback(String xid) throws TransactionException;

    // 查询全局事务状态
    GlobalStatus getStatus(String xid) throws TransactionException;

    // 上报全局事务状态
    GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException;
}
初始化

由TMClient#init方法触发TMClient的初始化,其核心逻辑如下

  1. 创建TmNettyRemotingClient对象(单例)

    ​ 主要是设置好一些参数,包括Netty相关的NioEventLoopGroup、使用AbstractNettyRemotingClient$ClientHandler作为ChannelPipeline的ChannelHandler等等;并配置好NettyClientChannelManager对象用来管理Netty Channel

  2. TMClient真正初始化

    1. 注册ResponseProcessor,将其缓存到AbstractNettyRemoting#processorTable字段中,用来处理服务端返回的各种响应,具体类型如下

      MessageType.TYPE_SEATA_MERGE_RESULT
      MessageType.TYPE_GLOBAL_BEGIN_RESULT # 全局事务开启的结果
      MessageType.TYPE_GLOBAL_COMMIT_RESULT # 全局事务提交的结果
      MessageType.TYPE_GLOBAL_REPORT_RESULT
      MessageType.TYPE_GLOBAL_ROLLBACK_RESULT # 全局事务回滚的结果
      MessageType.TYPE_GLOBAL_STATUS_RESULT
      MessageType.TYPE_REG_CLT_RESULT
      
    2. 开启一些定时任务:包括与服务端Channel的reconnect任务、MergedSend任务(处理批量消息发送)、异步请求Future的超时检测任务。

    3. 初始化TM相关的netty客户端的Bootstrap,主要配置了一些如下的ChannelHandler:

      • IdleStateHandler:心跳检测,配合 NettyClientChannelManager 保证连接可用
      • ProtocolV1Encoder和ProtocolV1Decoder:处理消息编解码
      • ClientHandler:处理服务端的响应,以及 IdleStateEvent 事件等等
RMClient
RM(Resource Manager)

​ 资源管理器,接口为 ResourceManager,负责管理分支事务资源。它与 TC(事务协调器) 交互,用于:注册分支事务上报分支事务状态驱动分支事务提交或回滚。ResourceManager继承了两个接口:

  • ResourceManagerInbound:作为 Inbound,处理 TC 发起的分支事务 commitrollback 请求
  • ResourceManagerOutbound:作为 Outbound,主动向 TC 注册分支事务、上报事务状态,以及执行全局锁查询

​ 在 Seata 中,每个DataSource都被视为一个资源,而 ResourceManager 则负责操作这些资源。代码上通过 代理 DataSource 来实现 ResourceManager 的功能。默认实现是 DefaultResourceManager,它充当适配器,根据不同事务模式选择不同的具体实现

  • AT 模式:使用 DataSourceManager
  • TCC 模式:使用 TCCResourceManager
public interface ResourceManager extends ResourceManagerInbound, ResourceManagerOutbound {

    // 向 TC 注册资源
    void registerResource(Resource resource);

    void unregisterResource(Resource resource);

    Map<String, Resource> getManagedResources();

    // 获取分支事务类型。例如:AT、TCC等
    BranchType getBranchType();

    // ============================== ResourceManagerInbound ==============================
    // 处理 TC(事务协调器)发起的分支事务commit请求
    BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;

    // 处理 TC(事务协调器)发起的分支事务roillback请求
    BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;

    // ============================== ResourceManagerOutbound功能 ==============================

    // 向 TC 注册分支事务
    Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException;
    // 向 TC 上报分支事务状态
    void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status, String applicationData) throws TransactionException;
    // 向 TC 查询指定锁是否已被占用
    boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys) throws TransactionException;
}
初始化

RMClient#init 方法触发 RMClient 初始化,其创建的 RmNettyRemotingClient 和 TMClient 的 TmNettyRemotingClient 都继承自 AbstractNettyRemotingClient,所以整体流程类似 TMClient。不同点主要有:

  1. 设置DefaultRMHandler为其AbstractNettyRemotingClient#transactionMessageHandler

    DefaultRMHandler 本身是一个适配器,内部通过 SPI 加载对应 BranchType 的真正处理器(如 AT 模式使用 RMHandlerAT)。当接收到 TC 消息时,会根据 BranchType 找到对应的 RMHandler 去处理请求

  2. 注册的ResponseProcessor不同,registerProcessor方法如下,可以发现对分支事务的commit,rollback和UndoLog的删除都是用了messageExecutor来异步处理,这三个功能都会访问数据库,这里采用异步处理以避免网络IO阻塞 Netty IO 线程,导致Channel的吞吐量下降

    private void registerProcessor() {
        // 注册客户端事务 commit,rollback,undo log信息处理器(服务端主动发送的消息,对全局事务进行提交,回滚等操作)
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(),
                this);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
        RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
    
        // 注册服务端响应消息的处理器(RM发送事务相关请求,服务端会响应)
        ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(),
                getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
        // 5.registry heartbeat message processor
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }
    

SeataDataSourceAutoConfiguration

​ 这个配置类主要是为了支持 ATXA 事务模式。它内部引入了 SeataAutoDataSourceProxyCreator,这是个带代理功能的 Bean,专门负责对 DataSource 进行代理。这个类会创建一个 SeataAutoDataSourceProxyAdvice 切面,把对 DataSource 的调用转发给对应的代理类(AT 模式用 DataSourceProxy,XA 模式用 DataSourceProxyXA

SQL执行代理

​ 在 Java 的 SQL 执行流程里,其实一直是这一套: DataSource -> Connection -> PreparedStatement

​ DataSource 负责拿 Connection,Connection 负责创建 PreparedStatement 并处理事务,PreparedStatement 执行 SQL。所以 Seata 就对这几个关键接口都做了代理,目的就是 加上分布式事务的控制逻辑

  • DataSource → DataSourceProxy

    ​ DataSourceProxy 会在原有逻辑上,额外把当前 RM 注册到 TC,并把原始 Connection 包装成 ConnectionProxy,用于支持后续事务功能

  • Connection → ConnectionProxy

    ​ ConnectionProxy 会增强 commit 和 rollback 的逻辑。还会对获取PreparedStatement时将其包装成 PreparedStatementProxy用于其SQL执行的增强

    • commit:不仅提交,还会注册分支事务、刷新 undo_log
    • rollback:除了回滚,还会上报当前分支事务的状态,驱动全局事务回滚
  • PreparedStatement → PreparedStatementProxy

    主要是在 SQL 执行前后做镜像查询,用来生成 undo_log,以便后续能回滚

服务端

​ Seata 作为分布式事务解决方案,需要一个 服务端协调者 来统一管理整个分布式事务上下文中各微服务的事务状态。因此,Seata 提供了一个独立的 Spring Boot 服务,由 seata-server 模块中的 ServerApplication 启动,最终触发 io.seata.server.Server#start 方法来启动服务端

public static void start(String[] args) {
    // 初始化参数解析器,解析启动参数
    ParameterParser parameterParser = new ParameterParser(args);

    // 统计相关功能初始化
    MetricsManager.get().init();

    ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
            NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
            new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()),
            new ThreadPoolExecutor.CallerRunsPolicy());

    // 确定 Server 的 IP
    if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
        XID.setIpAddress(parameterParser.getHost());
    } else {
        String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
        if (StringUtils.isNotBlank(preferredNetworks)) {
            XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
        } else {
            XID.setIpAddress(NetUtil.getLocalIp());
        }
    }
    // 创建 Netty 服务端对象,负责接收客户端(RM/TM)的请求
    NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
    XID.setPort(nettyRemotingServer.getListenPort());
    UUIDGenerator.init(parameterParser.getServerNode());
    SessionHolder.init();
    LockerManagerFactory.init();
    // 创建TC
    DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
    // 启动一些定时任务,包括异步全局事务commit操作、超时检查等
    coordinator.init();
    // 将 Coordinator 设置为 Netty 的消息处理器
    nettyRemotingServer.setHandler(coordinator);

    ServerRunner.addDisposable(coordinator);

    // 注册消息处理器并构建netty的ServerBootstrap,开启端口的监听
    nettyRemotingServer.init();
}

​ 功能看着挺多的,我们只需重点关注服务端Netty的配置、TC的创建和初始化,还有接收到客户端的请求后是如何协调TC来处理对应的逻辑

NettyRemotingServer创建

NettyRemotingServer实例化做的事情不多,包括如下:

  • 提供一个messageExecutor线程池,用于异步处理客户端请求的部分消息,避免阻塞 Netty IO 线程
  • 构建 Netty ServerBootstrap,创建 bossGroupworkerGroup,并使用 ServerHandler 处理客户端消息。这个 ServerHandler 处理消息的逻辑和客户端的 ClientHandler 一致,都是调用 AbstractNettyRemoting#processMessage,再由后续配置的消息 Processor和对应的线程池去执行具体的业务处理

TC创建和初始化

​ TC(Transaction Coordinator,事务协调者)负责维护 全局事务分支事务 的状态,并驱动全局事务的提交或回滚,其实现了TransactionMessageHandler接口,是服务端所有消息处理的入口,实现为DefaultCoordinator。其主要职责分类为:

  • 与 RM 交互:处理分支事务的注册、提交和回滚,当 TM 发起全局事务的 commit 或 rollback 时,TC 会进一步驱动各分支事务完成相应操作
  • 与 TM 交互:负责全局事务的开启、提交、回滚等操作,保证全局事务状态一致

​ TC内部有很多单线程定时任务线程池,在其初始化阶段为启动一些列的定时任务,包括:全局事务的异步commit、客户端undo_log的删除等等

服务端Netty启动

  • 和 TMClient、RMClient 类似,服务端同样需要注册一系列消息处理器(Processor)。这些处理器最终会委托给 DefaultCoordinator(实现了 TransactionMessageHandler)来完成消息处理逻辑
  • 启动 NettyRemotingServer,其内部基于 ServerBootstrap 初始化并绑定到本地端口绑定成功后,通过 RegistryService#register 将当前服务节点注册到注册中心(如 Nacos、Consul、Zookeeper 等)。到此服务端的整体就启动完成了,可以对外提供服务了