Seata — 核心组件和启动流程
基于 Seata 1.7.1 源码,这篇文章算是 AT 模式源码解析的前置内容。文章先从 自动配置类 切入,梳理了客户端核心组件(TM、RM)的职责和初始化流程,接着分析 AT 模式下 DataSource 代理的实现原理,最后再简单介绍了服务端的启动过程以及 TC 的创建逻辑
客户端
当项目引入 seata-spring-boot-starter 后,当前模块就具备了分布式事务的能力,可以看作是 Seata 客户端。这个 Starter 主要加载 SeataAutoConfiguration 和 SeataDataSourceAutoConfiguration 两个自动配置类,用于提供分布式事务相关的核心功能
SeataAutoConfiguration
SeataAutoConfiguration 是客户端的核心配置类,所有事务模式都依赖它。其中注册了一个关键 GlobalTransactionScanner 这个具备代理功能的Bean,它承担两个核心职责
- 对注解
@GlobalTransactional和@GlobalLock的方法进行代理 - 完成 TM 和 RM 的初始化
注解代理
GlobalTransactionScanner 继承了 AbstractAutoProxyCreator,因此它具备 Spring 的 自动代理功能。重点看它的 wrapIfNecessary 方法,有两个关键点:
代理逻辑判断
如果Class或Method上有
@GlobalTransactional或@GlobalLock,就会创建一个代理,并加入GlobalTransactionalInterceptor作为拦截器Advisor 插入顺序
调用
GlobalTransactionScanner#findAddSeataAdvisorPosition来确定 Advisor 的位置 这里很重要,因为
GlobalTransactionalInterceptor的位置是SeataInterceptorPosition.BeforeTransaction,意味着它必须在 Spring 的事务切面(@Transactional)之前执行。
因为当@GlobalTransactional和@Transactional同时使用时,Seata 的全局事务逻辑需要先于本地事务,否则全局事务无法覆盖本地事务的执行范围
客户端初始化
Seata 客户端和服务端之间基于 Netty 长连接 进行通信。为了完成分布式事务协调,客户端抽象出两个核心角色:
- TM(Transaction Manager):负责全局事务的开始、提交、回滚
- RM(Resource Manager):负责分支事务的注册、回滚、提交
这两个角色分别由 TMClient 和 RMClient 管理,并在 GlobalTransactionScanner#afterPropertiesSet 中完成初始化。初始化过程中,每个角色会创建独立的 Netty 客户端(Bootstrap),其连接由 NettyClientChannelManager 统一管理,底层通过 Apache Commons Pool 实现连接池化,且按照 serverAddress(ip:port) 维度复用 Channel,确保一个服务端节点仅维护一条长连接,从而兼顾性能与资源利用率
TMClient
TM(Transaction Manager)
事务管理器,接口为TransactionManager,可以理解成分布式事务版的 Spring PlatformTransactionManager。它负责发起、提交、回滚全局事务。区别是 Spring 靠数据库事务,Seata 需要服务端(协调者)来兜底。且只有全局事务的发起者才会触发 TM 的 begin / commit / rollback。代码里 @GlobalTransactional 切面就是通过 GlobalTransactionalInterceptor 调用 TM,默认实现是 DefaultTransactionManager
public interface TransactionManager {
String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException;
GlobalStatus commit(String xid) throws TransactionException;
GlobalStatus rollback(String xid) throws TransactionException;
// 查询全局事务状态
GlobalStatus getStatus(String xid) throws TransactionException;
// 上报全局事务状态
GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException;
}
初始化
由TMClient#init方法触发TMClient的初始化,其核心逻辑如下
创建TmNettyRemotingClient对象(单例)
主要是设置好一些参数,包括Netty相关的NioEventLoopGroup、使用AbstractNettyRemotingClient$ClientHandler作为ChannelPipeline的ChannelHandler等等;并配置好NettyClientChannelManager对象用来管理Netty Channel
TMClient真正初始化
注册ResponseProcessor,将其缓存到AbstractNettyRemoting#processorTable字段中,用来处理服务端返回的各种响应,具体类型如下
MessageType.TYPE_SEATA_MERGE_RESULT MessageType.TYPE_GLOBAL_BEGIN_RESULT # 全局事务开启的结果 MessageType.TYPE_GLOBAL_COMMIT_RESULT # 全局事务提交的结果 MessageType.TYPE_GLOBAL_REPORT_RESULT MessageType.TYPE_GLOBAL_ROLLBACK_RESULT # 全局事务回滚的结果 MessageType.TYPE_GLOBAL_STATUS_RESULT MessageType.TYPE_REG_CLT_RESULT开启一些定时任务:包括与服务端Channel的reconnect任务、MergedSend任务(处理批量消息发送)、异步请求Future的超时检测任务。
初始化TM相关的netty客户端的Bootstrap,主要配置了一些如下的ChannelHandler:
- IdleStateHandler:心跳检测,配合
NettyClientChannelManager保证连接可用 - ProtocolV1Encoder和ProtocolV1Decoder:处理消息编解码
- ClientHandler:处理服务端的响应,以及 IdleStateEvent 事件等等
- IdleStateHandler:心跳检测,配合
RMClient
RM(Resource Manager)
资源管理器,接口为 ResourceManager,负责管理分支事务资源。它与 TC(事务协调器) 交互,用于:注册分支事务、上报分支事务状态、驱动分支事务提交或回滚。ResourceManager继承了两个接口:
- ResourceManagerInbound:作为 Inbound,处理 TC 发起的分支事务
commit和rollback请求 - ResourceManagerOutbound:作为 Outbound,主动向 TC 注册分支事务、上报事务状态,以及执行全局锁查询
在 Seata 中,每个DataSource都被视为一个资源,而 ResourceManager 则负责操作这些资源。代码上通过 代理 DataSource 来实现 ResourceManager 的功能。默认实现是 DefaultResourceManager,它充当适配器,根据不同事务模式选择不同的具体实现
- AT 模式:使用
DataSourceManager - TCC 模式:使用
TCCResourceManager
public interface ResourceManager extends ResourceManagerInbound, ResourceManagerOutbound {
// 向 TC 注册资源
void registerResource(Resource resource);
void unregisterResource(Resource resource);
Map<String, Resource> getManagedResources();
// 获取分支事务类型。例如:AT、TCC等
BranchType getBranchType();
// ============================== ResourceManagerInbound ==============================
// 处理 TC(事务协调器)发起的分支事务commit请求
BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;
// 处理 TC(事务协调器)发起的分支事务roillback请求
BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;
// ============================== ResourceManagerOutbound功能 ==============================
// 向 TC 注册分支事务
Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException;
// 向 TC 上报分支事务状态
void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status, String applicationData) throws TransactionException;
// 向 TC 查询指定锁是否已被占用
boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys) throws TransactionException;
}
初始化
RMClient#init 方法触发 RMClient 初始化,其创建的 RmNettyRemotingClient 和 TMClient 的 TmNettyRemotingClient 都继承自 AbstractNettyRemotingClient,所以整体流程类似 TMClient。不同点主要有:
设置DefaultRMHandler为其AbstractNettyRemotingClient#transactionMessageHandler
DefaultRMHandler本身是一个适配器,内部通过 SPI 加载对应BranchType的真正处理器(如 AT 模式使用RMHandlerAT)。当接收到 TC 消息时,会根据BranchType找到对应的 RMHandler 去处理请求注册的ResponseProcessor不同,registerProcessor方法如下,可以发现对分支事务的commit,rollback和UndoLog的删除都是用了messageExecutor来异步处理,这三个功能都会访问数据库,这里采用异步处理以避免网络IO阻塞 Netty IO 线程,导致Channel的吞吐量下降
private void registerProcessor() { // 注册客户端事务 commit,rollback,undo log信息处理器(服务端主动发送的消息,对全局事务进行提交,回滚等操作) RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this); super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor); RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this); super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor); RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler()); super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor); // 注册服务端响应消息的处理器(RM发送事务相关请求,服务端会响应) ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler()); super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null); // 5.registry heartbeat message processor ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor(); super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null); }
SeataDataSourceAutoConfiguration
这个配置类主要是为了支持 AT 或 XA 事务模式。它内部引入了 SeataAutoDataSourceProxyCreator,这是个带代理功能的 Bean,专门负责对 DataSource 进行代理。这个类会创建一个 SeataAutoDataSourceProxyAdvice 切面,把对 DataSource 的调用转发给对应的代理类(AT 模式用 DataSourceProxy,XA 模式用 DataSourceProxyXA)
SQL执行代理
在 Java 的 SQL 执行流程里,其实一直是这一套: DataSource -> Connection -> PreparedStatement
DataSource 负责拿 Connection,Connection 负责创建 PreparedStatement 并处理事务,PreparedStatement 执行 SQL。所以 Seata 就对这几个关键接口都做了代理,目的就是 加上分布式事务的控制逻辑
DataSource → DataSourceProxy
DataSourceProxy 会在原有逻辑上,额外把当前 RM 注册到 TC,并把原始 Connection 包装成 ConnectionProxy,用于支持后续事务功能
Connection → ConnectionProxy
ConnectionProxy 会增强 commit 和 rollback 的逻辑。还会对获取PreparedStatement时将其包装成 PreparedStatementProxy用于其SQL执行的增强
- commit:不仅提交,还会注册分支事务、刷新 undo_log
- rollback:除了回滚,还会上报当前分支事务的状态,驱动全局事务回滚
PreparedStatement → PreparedStatementProxy
主要是在 SQL 执行前后做镜像查询,用来生成 undo_log,以便后续能回滚
服务端
Seata 作为分布式事务解决方案,需要一个 服务端协调者 来统一管理整个分布式事务上下文中各微服务的事务状态。因此,Seata 提供了一个独立的 Spring Boot 服务,由 seata-server 模块中的 ServerApplication 启动,最终触发 io.seata.server.Server#start 方法来启动服务端
public static void start(String[] args) {
// 初始化参数解析器,解析启动参数
ParameterParser parameterParser = new ParameterParser(args);
// 统计相关功能初始化
MetricsManager.get().init();
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()),
new ThreadPoolExecutor.CallerRunsPolicy());
// 确定 Server 的 IP
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
if (StringUtils.isNotBlank(preferredNetworks)) {
XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
}
// 创建 Netty 服务端对象,负责接收客户端(RM/TM)的请求
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
XID.setPort(nettyRemotingServer.getListenPort());
UUIDGenerator.init(parameterParser.getServerNode());
SessionHolder.init();
LockerManagerFactory.init();
// 创建TC
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
// 启动一些定时任务,包括异步全局事务commit操作、超时检查等
coordinator.init();
// 将 Coordinator 设置为 Netty 的消息处理器
nettyRemotingServer.setHandler(coordinator);
ServerRunner.addDisposable(coordinator);
// 注册消息处理器并构建netty的ServerBootstrap,开启端口的监听
nettyRemotingServer.init();
}
功能看着挺多的,我们只需重点关注服务端Netty的配置、TC的创建和初始化,还有接收到客户端的请求后是如何协调TC来处理对应的逻辑
NettyRemotingServer创建
NettyRemotingServer实例化做的事情不多,包括如下:
- 提供一个messageExecutor线程池,用于异步处理客户端请求的部分消息,避免阻塞 Netty IO 线程
- 构建 Netty
ServerBootstrap,创建bossGroup和workerGroup,并使用ServerHandler处理客户端消息。这个ServerHandler处理消息的逻辑和客户端的ClientHandler一致,都是调用AbstractNettyRemoting#processMessage,再由后续配置的消息 Processor和对应的线程池去执行具体的业务处理
TC创建和初始化
TC(Transaction Coordinator,事务协调者)负责维护 全局事务 和 分支事务 的状态,并驱动全局事务的提交或回滚,其实现了TransactionMessageHandler接口,是服务端所有消息处理的入口,实现为DefaultCoordinator。其主要职责分类为:
- 与 RM 交互:处理分支事务的注册、提交和回滚,当 TM 发起全局事务的 commit 或 rollback 时,TC 会进一步驱动各分支事务完成相应操作
- 与 TM 交互:负责全局事务的开启、提交、回滚等操作,保证全局事务状态一致
TC内部有很多单线程定时任务线程池,在其初始化阶段为启动一些列的定时任务,包括:全局事务的异步commit、客户端undo_log的删除等等
服务端Netty启动
- 和 TMClient、RMClient 类似,服务端同样需要注册一系列消息处理器(Processor)。这些处理器最终会委托给
DefaultCoordinator(实现了TransactionMessageHandler)来完成消息处理逻辑 - 启动
NettyRemotingServer,其内部基于ServerBootstrap初始化并绑定到本地端口。绑定成功后,通过RegistryService#register将当前服务节点注册到注册中心(如 Nacos、Consul、Zookeeper 等)。到此服务端的整体就启动完成了,可以对外提供服务了