Seata — AT模式一阶段全解析
基于 Seata 1.7.1 源码,本篇文章详细拆解了一阶段分布式事务的执行流程,重点包括 降级检查策略、undo log 的生成与管理、非本地事务运行模式下的支持、锁冲突检测及重试机制,以及一阶段事务如何决定提交或回滚,为理解 AT 模式的核心原理提供了全面解析
Seata 的 AT 模式本质上就是在一阶段做足准备,二阶段才能高效提交或回滚。我们从@GlobalTransactional 入手,来看看 GlobalTransactionalInterceptor 在分布式事务的一阶段里到底做了什么
降级检查
GlobalTransactionalInterceptor 会在方法执行前先判断是否处于降级状态,如果已经降级,就直接跳过全局事务逻辑,改走本地事务,避免全局事务挂掉时影响业务。其降级检查相关参数和方法如下,核心逻辑如下:
启动定时任务,定期做可用性检测
在
GlobalTransactionalInterceptor构造方法里,会调用startDegradeCheck(),开启一个定时任务。这个任务每隔一段时间,创建一个空的全局事务并马上提交,以此测试 TC 是否正常 另外,业务中的全局事务执行完成后,也会根据执行结果发布DegradeCheckEvent事件
根据事件统计状态
通过监听
DegradeCheckEvent,统计连续失败次数degradeNum和连续成功次数reachNum- 降级:失败次数达到degradeCheckAllowTimes(默认 10 次)
- 恢复:连续成功达到degradeCheckAllowTimes(同样是 10 次)
// 降级检查任务执行的时间间隔(单位:毫秒)
// 仅当 degradeCheck = true 时有效,默认值为 2000 毫秒
private static int degradeCheckPeriod;
// 是否启用降级检查策略的动态开关
// 受配置中心的client.tm.degradeCheck参数控制
private static final AtomicBoolean ATOMIC_DEGRADE_CHECK = new AtomicBoolean(false);
// 降级检查失败允许的最大次数
// 当连续失败次数 >= 此值时,会触发事务模式降级(从全局事务退化为本地事务)
// 仅当 degradeCheck = true 时有效,默认值为 10
private static int degradeCheckAllowTimes;
// 当前累计的连续失败次数(用于降级)
private static volatile Integer degradeNum = 0;
// 当前累计的连续成功次数(用于恢复)
private static volatile Integer reachNum = 0;
// 基于 Guava 的事件总线,用于发布和监听降级相关事件
private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);
// 定时任务线程池,只会执行降级检查任务
private static volatile ScheduledThreadPoolExecutor executor;
/**
* 开启降级检查的定时任务
*/
private static void startDegradeCheck() {
if (!ATOMIC_DEGRADE_CHECK.compareAndSet(false, true)) {
return;
}
if (executor != null && !executor.isShutdown()) {
return;
}
executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));
executor.scheduleAtFixedRate(() -> {
if (ATOMIC_DEGRADE_CHECK.get()) {
try {
// 开启一个空的全局事务
String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);
// 马上提交全局事务
TransactionManagerHolder.get().commit(xid);
EVENT_BUS.post(new DegradeCheckEvent(true));
} catch (Exception e) {
EVENT_BUS.post(new DegradeCheckEvent(false));
}
}
}, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
}
/**
* DegradeCheckEvent事件触发,统计降级相关数据
*/
@Subscribe
public static void onDegradeCheck(DegradeCheckEvent event) { // 10
if (event.isRequestSuccess()) { // 成功
if (degradeNum >= degradeCheckAllowTimes) { // 已经处于降级状态
// 统计连续成功次数
reachNum++;
if (reachNum >= degradeCheckAllowTimes) { // 连续成功次数达到阈值 → 恢复全局事务
reachNum = 0;
degradeNum = 0;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("the current global transaction has been restored");
}
}
} else if (degradeNum != 0) { // 不在降级状态直接重置degradeNum,因为不连续了
degradeNum = 0;
}
} else { // 失败
if (degradeNum < degradeCheckAllowTimes) { // 还没降级
// 增加失败计数
degradeNum++;
if (degradeNum >= degradeCheckAllowTimes) { // 连续失败次数达到阈值 → 会启用降级模式,这个做个日志
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("the current global transaction has been automatically downgraded");
}
}
} else if (reachNum != 0) { // 已降级,但之前有成功累计 → 重置成功次数
reachNum = 0;
}
}
}
分布式事务的执行
@GlobalTransactional 的核心逻辑封装在TransactionalTemplate#execute方法中,其整体设计思路明显借鉴了 Spring 的事务处理模式。可以将其类比理解为:
- 解析注解信息,将
@GlobalTransactional的属性组装为TransactionInfo(类似 Spring 的TransactionAttribute) - 基于该信息创建并绑定
GlobalTransaction,并应用对应的传播行为(相当于 Spring 的TransactionStatus) - 执行业务逻辑,根据是否抛出异常决定调用 rollback 还是 commit
- 恢复执行上下文,确保支持 REQUIRES_NEW 和 NOT_SUPPORTED 这类传播策略
整体流程可以概括为:构建事务上下文 → 执行业务逻辑 → 结束事务 → 恢复现场,与 Spring 事务的核心思想是一致的,只是这里针对的是 分布式全局事务。我们先有个整体印象,然后再详细分析关键步骤的细节
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 获取事务信息(事务传播、超时时间、隔离级别等)
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 获取当前线程绑定的全局事务(不存在则表示是初次,则为发起者,否则为参与者)
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
if (existingTransaction(tx)) { // 存在事务就先暂停,以非全局事务来处理
suspendedResourcesHolder = tx.suspend(false);
}
return business.execute();
case REQUIRES_NEW:
// 存在事务就先暂停,开启一个新的全局事务
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend(false);
}
tx = GlobalTransactionContext.createNew();
break;
case SUPPORTS:
// 存在全局事务就加入,不存在就以非全局事务的方式执行
if (notExistingTransaction(tx)) {
return business.execute();
}
break;
case REQUIRED:
// 如果没有全局事务,则创建一个;否则使用当前事务
tx = GlobalTransactionContext.getCurrentOrCreate();
break;
case NEVER:
// 如果存在事务,直接抛异常
if (existingTransaction(tx)) {
throw new TransactionException(
String.format(
"Existing transaction found for transaction marked with propagation 'never', xid = %s",
tx.getXid()));
} else {
return business.execute();
}
case MANDATORY:
// 必须有全局事务,否则抛异常
if (notExistingTransaction(tx)) {
throw new TransactionException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
// ======================= 进入事务逻辑 =======================
try {
// 利用TM开启全局事务(只有全局事务的发起者开可以开启),注册到server端并触发对应的的hook
beginTransaction(txInfo, tx);
Object rs;
try {
// 执行业务代码
rs = business.execute();
// =============== 往下走就算是二阶段的任务了,准备全局事务的提交或回滚 =====================
} catch (Throwable ex) {
// 业务异常触发全局事务回滚
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 业务执行成功 → 提交全局事务(只有事务发起者能提交)
commitTransaction(tx, txInfo);
return rs;
} finally {
// 全局事务配置(锁重试次数和锁重试时间间隔)回退(回退到上一个事务)或清除
resumeGlobalLockConfig(previousConfig);
// afterCompletion的钩子函数
triggerAfterCompletion();
// 钩子函数的clean
cleanUp();
}
} finally {
// 上一个事务的resume,很简单,重新绑定上一个xid就行
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
XID
GlobalTransactionContext#getCurrent 用于获取当前全局事务信息,内部逻辑很简单:根据当前线程上下文是否存在 XID 来判断是 全局事务发起者 还是 参与者
XID 在整个分布式事务链路中起到类似 traceId 的作用,必须传递给下游服务,才能将多个微服务串联成一个完整的全局事务。Seata 的 integration 模块中实现了主流 RPC 框架的 XID 传播机制,例如 Dubbo 对应的 AlibabaDubboTransactionPropagationFilter
Launcher(全局事务发起者)
作为全局事务的起点,如果线程上下文不存在 XID 时,才有资格创建新的全局事务。发起者负责开启全局事务,并在业务逻辑跑完后,根据是否抛异常来决定提交还是回滚
Participant(全局事务参与者)
处于微服务调用链的中下层,参与到了其他上游服务发起的全局事务(这时线程上下文存在xid)。对应 Spring 的 加入已有事务传播行为。所以,全局事务的参与者不能触发全局事务的回滚或提交,它能做的就是通过 RM 报告当前分支事务的执行结果。如果执行失败,会把状态标记成 PhaseOne_Failed,等全局回滚的时候一起处理
全局事务开启
全局事务是通过 GlobalTransaction#begin 来启动的。内部由TM构建一个 GlobalBeginRequest 请求发到 TC,TC 处理完成后会回传一个 XID,客户端拿到这个 XID 后绑定到当前线程上下文,用于后续分支事务的关联
server端的TC 收到请求后,最终会走到 DefaultCore#begin 方法,核心逻辑可以简单概括为:
创建 GlobalSession:初始化一个全局事务对象,并生成唯一的 XID
存储 GlobalSession
通过不同的SessionLifecycleListener实现类来选择存储策略。以DataBaseSessionManager为例,会将GlobalSession转化为GlobalTransactionDO数据结构,并插入到global_table表中。至此全局事务信息完成持久化
回传 XID 给客户端
// DefaultCore#begin
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 创建globalSession
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 开启session,根据不同的策略(file、db、redis)将session保存起来
session.begin();
// transaction start event
MetricsPublisher.postSessionDoingEvent(session, false);
return session.getXid();
}
执行业务代码
分布式事务要管的核心,其实就是数据库的读写操作。根据上一篇文章的分析,Seata通过代理DataSource、Connection、PreparedStatement来插入分布式事务逻辑。不过 DataSource 代理在事务期间主要是为了提供 ConnectionProxy,因此真正的重点在 ConnectionProxy 和 PreparedStatementProxy,这两个才是实现核心逻辑的关键角色
PreparedStatementProxy
PreparedStatementProxy 在执行 SQL 时,会借助 Druid 等工具解析当前业务的 DML 语句,然后根据 SQL 类型选择对应的 AbstractDMLBaseExecutor 子类,并最终调用 Executor#execute 完成执行。SQL 类型和执行器映射关系大致如下:
- INSERT → 不同的数据库有不同的实现,MYSQL为MySQLInsertExecutor:其beforeImage为空实现,此时数据都还没有的
- UPDATE →
UpdateExecutor:最常用的一种Executor,前后镜像都要执行SELECT - DELETE →
DeleteExecutor:其afterImage为空实现,因为数据都删了 - SELECT_FOR_UPDATE →
SelectForUpdateExecutor:全局事务读已提交功能的核心实现 - INSERT_ON_DUPLICATE_UPDATE → MYSQL为
MySQLInsertOnDuplicateUpdateExecutor - UPDATE_JOIN → MYSQL为
MySQLUpdateJoinExecutor - SELECT →
PlainExecutor:这种 SQL 不涉及数据修改,不会生成前后镜像,也不会构建undo_log,直接执行原 SQL
AbstractDMLBaseExecutor
AbstractDMLBaseExecutor是Seata 处理 DML 语句(INSERT、UPDATE、DELETE)的核心抽象类,它定义了执行 SQL 前后获取镜像数据的模板方法:
- beforeImage():在SQL 之前,查询旧数据保存为TableRecords
- afterImage():在SQL之后,查询新数据保存为TableRecords
这两份TableRecords是后续构建undo_log和lockKey基础:
- undo_log:全局事务回滚时用于执行反向补偿
- lockKey:用于全局锁控制,避免不同的全局事务在结束前并发修改同一行数据
其核心方法如下,execute()作为整体的入口,最终都会走到 executeAutoCommitFalse(),在这里完成 SQL 执行、镜像采集,以及 undo_log 和 lockKey 的生成
/**
* 入口
*/
public T execute(Object... args) throws Throwable {
String xid = RootContext.getXID();
if (xid != null) { // 存在全局事务,将全局事务id设置到ConnectionProxy中
statementProxy.getConnectionProxy().bind(xid);
}
// @GlobalLock注解支持
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return doExecute(args);
}
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) { // 当前事务为自动提交(代表没有主动开启事务)
// 一般就是只用了分布式事务但没有使用本地事务的情况。
// 这时每条dml都会被当作分支事务来执行,即执行完后都会操作seata的commit来注册分支事务(增加了rpc通信压力)
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
// 执行所有SQL前修改autoCommit为false,即手动提交事务
connectionProxy.changeAutoCommit();
// 锁重试机制来commit
return new LockRetryPolicy(connectionProxy).execute(() -> {
T result = executeAutoCommitFalse(args); // 走非自动提交事务的commit逻辑(准备undo日志)
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it
// here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
/**
* 准备执行SQL了
*/
protected T executeAutoCommitFalse(Object[] args) throws Exception {
try {
// 1. 记录sql执行前镜像
TableRecords beforeImage = beforeImage();
// 2. 执行业务sql
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
// 3. 记录sql执行后的镜像
TableRecords afterImage = afterImage(beforeImage);
// 4. 准备undo_log(只是准备,还未写表的)
prepareUndoLog(beforeImage, afterImage);
return result;
} catch (TableMetaException e) {
LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
e.getTableName(), e.getColumnName());
statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
throw e;
}
}
/**
* 准备undo_log数据
*/
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
return;
}
// update语句的前后镜像Records数量一定要一样,Seata是不允许Update主键数据的
if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
if (beforeImage.getRows().size() != afterImage.getRows().size()) {
throw new ShouldNeverHappenException(
"Before image size is not equaled to after image size, probably because you updated the primary keys.");
}
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
// 准备先构建lockKey,可以映射到具体变化的某一行数据
// DELETE作为删除操作,只有beforeImage可以定位到lockKey
// 其余的update、insert都可以用afterImage来定位lockKey
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
// 构建lock
String lockKeys = buildLockKey(lockKeyRecords);
if (null != lockKeys) {
// 将lockKey和undo_log数据都先暂存起来
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}
}
/**
* 锁冲突回滚策略
*/
private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy {
LockRetryPolicy(final ConnectionProxy connection) {
super(connection);
}
@Override
public <T> T execute(Callable<T> callable) throws Exception {
if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {
return doRetryOnLockConflict(callable);
} else {
return callable.call();
}
}
/**
* 出现其他异常,则操作origin connection进行真正的回滚
*/
@Override
protected void onException(Exception e) throws Exception {
ConnectionContext context = connection.getContext();
//UndoItems can't use the Set collection class to prevent ABA
context.removeSavepoint(null);
connection.getTargetConnection().rollback();
}
public static boolean isLockRetryPolicyBranchRollbackOnConflict() {
return LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT;
}
}
非本地事务执行机制
注意在 AbstractDMLBaseExecutor#doExecute 方法里会把事务的自动提交改成手动提交,为什么呢?
假设你在一个分布式事务里,只用了 Seata 的
@GlobalTransactional注解,而没用 Spring 的@Transactional控制本地事务,其实 Seata 也是支持的这种情况下,如果没有本地事务,JDBC 默认是 autoCommit=true,也就是每条 SQL 执行完就直接提交事务。问题是如果这样做,Seata 就没法保证 业务 SQL 和 undo_log 写入的原子性
所以 Seata 在这里会先判断:如果当前连接是自动提交的,就把它改成手动提交(
autoCommit=false)。这样能确保它先完成这几个步骤: 镜像 SQL 准备 → 业务 SQL 执行 → undo_log 构建 → 注册分支事务 → undo_log 落库
最后再统一commit,这样业务 SQL 和 undo_log 写入就是一个原子操作,不会出问题再说分支事务:
- 如果代码里开启了本地事务,那么整个本地事务算一个分支事务,共享同一个 XID
- 如果没开本地事务,那每一条
update、delete、insert都会被 Seata 当成一个分支事务,由它来管全局一致性所以只用 Seata,不开本地事务,也是可以的,但性能会差不少。为什么?因为每个分支事务都得跟 TC 通信、做数据库锁检查、写 undo_log,成本就变得很高了
ConnectionProxy
ConnectionProxy 是一阶段的核心控制器,它承载了分支事务的注册、锁冲突处理和重试、undo_log 落库以及事务提交/回滚等核心逻辑,我们依次来拆解一下这些功能,看看它们到底做了什么
public class ConnectionProxy extends AbstractConnectionProxy {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxy.class);
private final ConnectionContext context = new ConnectionContext();
private final LockRetryPolicy lockRetryPolicy = new LockRetryPolicy(this);
private static final int REPORT_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.CLIENT_REPORT_RETRY_COUNT, DEFAULT_CLIENT_REPORT_RETRY_COUNT);
/**
* 默认false
*/
public static final boolean IS_REPORT_SUCCESS_ENABLE = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.CLIENT_REPORT_SUCCESS_ENABLE, DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE);
public void checkLock(String lockKeys) throws SQLException {
if (StringUtils.isBlank(lockKeys)) {
return;
}
try {
// 仅检查,不上锁
boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,
getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
if (!lockable) { // 锁冲突,直接抛LockConflictException让上层去处理
throw new LockConflictException(String.format("get lock failed, lockKey: %s", lockKeys));
}
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, lockKeys);
}
}
public boolean lockQuery(String lockKeys) throws SQLException {
boolean result = false;
try {
result = DefaultResourceManager.get().lockQuery(BranchType.AT, getDataSourceProxy().getResourceId(),
context.getXid(), lockKeys);
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, lockKeys);
}
return result;
}
/**
* LockConflictException异常转换
*/
private void recognizeLockKeyConflictException(TransactionException te) throws SQLException {
recognizeLockKeyConflictException(te, null);
}
private void recognizeLockKeyConflictException(TransactionException te, String lockKeys) throws SQLException {
if (te.getCode() == TransactionExceptionCode.LockKeyConflict
|| te.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
StringBuilder reasonBuilder = new StringBuilder("get global lock fail, xid:");
reasonBuilder.append(context.getXid());
if (StringUtils.isNotBlank(lockKeys)) {
reasonBuilder.append(", lockKeys:").append(lockKeys);
}
throw new LockConflictException(reasonBuilder.toString(), te.getCode());
} else {
// 转换为SQLException用于后续的回滚
throw new SQLException(te);
}
}
/**
* 提交事务
*/
@Override
public void commit() throws SQLException {
try {
lockRetryPolicy.execute(() -> {
doCommit();
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
// 说明是Spring主动开启了事务,这里才操作回滚
// 因为如果是Seata帮我们开启的本地事务(即设置autoCommit=false),会在AbstractDMLBaseExecutor$LockRetryPolicy#onException里进行回滚
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit(); // 会注册分支事务到server端,并对修改的数据上锁(所以,至少会操作server端的两张表,branch_session和lock_table表)
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks(); // 速度快,因为仅对修改的数据进行锁检查(不上锁)。适合不需要使用事务但不能脏读(读到其他全局事务未整体提交但分支事务已提交的场景)的场景使用
} else {
targetConnection.commit();
}
}
private void processGlobalTransactionCommit() throws SQLException {
try {
// 注册分支事务,对当前分支事务修改的数据进行上锁(netty通信,server端上锁到lock_table表)
register();
} catch (TransactionException e) { // 锁冲突异常判断(发生了锁冲突会重新抛出LockConflictException)
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// 刷新undo_log到数据库
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
// 当前分支事务真正提交
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
private void processLocalCommitWithGlobalLocks() throws SQLException {
// 检查锁资源,存在其他事务上锁了就直接抛LockConflictException异常,进行锁重试
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}
private void register() throws TransactionException {
// undo_log或lockKey不存在,表示当前事务没有数据修改(可能只有select操作),也就不需要注册为分支事务了
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), context.getApplicationData(),
context.buildLockKeys());
context.setBranchId(branchId);
}
@Override
public void rollback() throws SQLException {
targetConnection.rollback();
if (context.inGlobalTransaction() && context.isBranchRegistered()) { // 身在全局事务中且当前事务已注册为分支事务才有资格report
report(false);
}
context.reset();
}
/**
* change connection autoCommit to false by seata
* <p/>
* 修改当前Connection为非自动提交事务
*
* @throws SQLException the sql exception
*/
public void changeAutoCommit() throws SQLException {
getContext().setAutoCommitChanged(true);
setAutoCommit(false);
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {
// change autocommit from false to true, we should commit() first according to
// JDBC spec.
doCommit();
}
targetConnection.setAutoCommit(autoCommit);
}
/**
* 根据commitDone向TC报告分支事务的状态
* TC也仅修改分支事务的状态,不会做回滚操作
*/
private void report(boolean commitDone) throws SQLException {
if (context.getBranchId() == null) {
return;
}
int retry = REPORT_RETRY_COUNT;
while (retry > 0) {
try {
DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
return;
} catch (Throwable ex) {
LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
+ commitDone + "] Retry Countdown: " + retry);
retry--;
if (retry == 0) {
throw new SQLException("Failed to report branch status " + commitDone, ex);
}
}
}
}
/**
* LockRetryPolicy:用于处理锁冲突时的重试策略。
*
* 在分布式事务场景下,数据写操作可能因锁冲突而失败,此类提供重试机制来提升成功率。
*/
public static class LockRetryPolicy {
/**
* 是否开启 锁冲突后分支事务回滚 策略
* 如果为 true,则表示在锁冲突时直接回滚分支事务,而不再进行多次重试。
*/
protected static final boolean LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = ConfigurationFactory
.getInstance().getBoolean(ConfigurationKeys.CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT, DEFAULT_CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT);
protected final ConnectionProxy connection;
public LockRetryPolicy(ConnectionProxy connection) {
this.connection = connection;
}
public <T> T execute(Callable<T> callable) throws Exception {
/*
不需要重试的机制如下:
1、开启了锁冲突回滚策略
2、当前Connection的自动提交改变过(由true变为false),这种情况已经单独处理了,就不需要再重试了
*/
if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT && connection.getContext().isAutoCommitChanged()) {
return callable.call();
} else {
// 重试
return doRetryOnLockConflict(callable);
}
}
/**
* 锁冲突重试
*/
protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
LockRetryController lockRetryController = new LockRetryController();
while (true) {
try {
return callable.call();
} catch (LockConflictException lockConflict) { // 所冲突异常捕获
onException(lockConflict);
if (connection.getContext().isAutoCommitChanged()
&& lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);
}
// sleep后重试,默认重试30次,间隔时间为10毫秒
lockRetryController.sleep(lockConflict);
} catch (Exception e) {
// 其他非锁冲突异常,不重试,直接抛出
onException(e);
throw e;
}
}
}
protected void onException(Exception e) throws Exception {
}
}
}
注册分支事务和锁检查
在 ConnectionProxy#register 方法里,会通过 RM 的 branchRegister 向 TC 发送一条 BranchRegisterRequest 消息,请求注册分支事务。TC 端的处理逻辑主要在 AbstractCore#branchRegister 里,代码很多我就不贴了,直接说流程:
- 将 BranchRegisterRequest 转成 BranchSession 对象
- 尝试加锁(AT 模式才需要)
- 会检查锁并调用
LockStoreDataBaseDAO#acquireLock去操作lock_table
- 会检查锁并调用
- 加锁逻辑
- 查询
lock_table是否已有对应的row_key,即行锁(每个row_key唯一标识数据库某张表的某行)- 没查到:说明没人占,直接插入 lock_table,加锁成功。
- 查到了,且 xid 一致:说明是当前全局事务的重复加锁,只需补充缺失的锁记录,依然算成功
- 查到了,xid 不一致:说明被别的全局事务占用,返回加锁失败,直接抛
LockKeyConflict异常(非常重要,客户端会靠它判断是否重试)
- 查询
- 加锁成功后的处理
- 把 BranchSession 转成 BranchTransactionDO
- 通过 SessionLifecycleListener 持久化(db 模式会写入
branch_table) - 返回
branchId给客户端
- 加锁失败
- 抛出
BranchTransactionException,异常码LockKeyConflict - 由 AbstractCallback 捕获并转成响应消息,发回客户端
- 抛出
锁冲突重试
当 ConnectionProxy#recognizeLockKeyConflictException 方法发现是 LockKeyConflict 异常时,会再包装成 LockConflictException 抛出去。接着,LockRetryPolicy 会接手,按重试策略继续尝试执行 SQL。默认策略是:重试 30 次,每次间隔 10ms(这些值受 GlobalLockConfig配置影响,可以调整)
如果 30 次重试之后还是抢不到锁,说明冲突一直没解决,那就直接触发 本地事务回滚,整个业务 SQL 失败收场
刷新undo_log
分支事务注册成功后,就可以把之前暂存的 Undo Log 真正写进数据库的 undo_log 表,确保如果二阶段回滚发生,可以通过它恢复数据
本地事务真正提交
该做的事都做完了:分支事务注册搞定、Undo Log 入库也搞定,这时候就可以安全地提交本地事务了
rollback
一阶段的回滚逻辑很简单,核心就两件事:
- 回滚本地事务
- 调用 RM → TC 的 branchReport 方法,把分支异常状态上报。TC 接到后只是把对应分支事务的状态改成 PhaseOne_Failed,并不会触发全局回滚
一阶段简单总结
一阶段的核心,就是把 undo_log 准备好,并保证 业务 SQL + undo_log + 分支事务注册 的原子性,这样二阶段的全局提交或回滚才能顺利进行。二阶段的具体操作,我们留到下一篇再详细讲