Seata — AT模式一阶段全解析

​ 基于 Seata 1.7.1 源码,本篇文章详细拆解了一阶段分布式事务的执行流程,重点包括 降级检查策略undo log 的生成与管理非本地事务运行模式下的支持锁冲突检测及重试机制,以及一阶段事务如何决定提交或回滚,为理解 AT 模式的核心原理提供了全面解析

​ Seata 的 AT 模式本质上就是在一阶段做足准备,二阶段才能高效提交或回滚。我们从@GlobalTransactional 入手,来看看 GlobalTransactionalInterceptor 在分布式事务的一阶段里到底做了什么

降级检查

GlobalTransactionalInterceptor 会在方法执行前先判断是否处于降级状态,如果已经降级,就直接跳过全局事务逻辑,改走本地事务,避免全局事务挂掉时影响业务。其降级检查相关参数和方法如下,核心逻辑如下:

  1. 启动定时任务,定期做可用性检测

    ​ 在 GlobalTransactionalInterceptor 构造方法里,会调用 startDegradeCheck(),开启一个定时任务。这个任务每隔一段时间,创建一个空的全局事务并马上提交,以此测试 TC 是否正常

    ​ 另外,业务中的全局事务执行完成后,也会根据执行结果发布DegradeCheckEvent事件

  2. 根据事件统计状态

    ​ 通过监听 DegradeCheckEvent,统计连续失败次数degradeNum和连续成功次数reachNum

    • 降级:失败次数达到degradeCheckAllowTimes(默认 10 次)
    • 恢复:连续成功达到degradeCheckAllowTimes(同样是 10 次)
// 降级检查任务执行的时间间隔(单位:毫秒)
// 仅当 degradeCheck = true 时有效,默认值为 2000 毫秒
private static int degradeCheckPeriod;
// 是否启用降级检查策略的动态开关
// 受配置中心的client.tm.degradeCheck参数控制
private static final AtomicBoolean ATOMIC_DEGRADE_CHECK = new AtomicBoolean(false);
// 降级检查失败允许的最大次数
// 当连续失败次数 >= 此值时,会触发事务模式降级(从全局事务退化为本地事务)
// 仅当 degradeCheck = true 时有效,默认值为 10
private static int degradeCheckAllowTimes;
// 当前累计的连续失败次数(用于降级)
private static volatile Integer degradeNum = 0;
// 当前累计的连续成功次数(用于恢复)
private static volatile Integer reachNum = 0;
// 基于 Guava 的事件总线,用于发布和监听降级相关事件
private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);
// 定时任务线程池,只会执行降级检查任务
private static volatile ScheduledThreadPoolExecutor executor;


/**
 * 开启降级检查的定时任务
 */
private static void startDegradeCheck() {
    if (!ATOMIC_DEGRADE_CHECK.compareAndSet(false, true)) {
        return;
    }
    if (executor != null && !executor.isShutdown()) {
        return;
    }
    executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));
    executor.scheduleAtFixedRate(() -> {
        if (ATOMIC_DEGRADE_CHECK.get()) {
            try {
                // 开启一个空的全局事务
                String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);
                // 马上提交全局事务
                TransactionManagerHolder.get().commit(xid);
                EVENT_BUS.post(new DegradeCheckEvent(true));
            } catch (Exception e) {
                EVENT_BUS.post(new DegradeCheckEvent(false));
            }
        }
    }, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
}

/**
 * DegradeCheckEvent事件触发,统计降级相关数据
 */
@Subscribe
public static void onDegradeCheck(DegradeCheckEvent event) { // 10
    if (event.isRequestSuccess()) { // 成功
        if (degradeNum >= degradeCheckAllowTimes) { // 已经处于降级状态
            // 统计连续成功次数
            reachNum++;
            if (reachNum >= degradeCheckAllowTimes) { // 连续成功次数达到阈值 → 恢复全局事务
                reachNum = 0;
                degradeNum = 0;
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("the current global transaction has been restored");
                }
            }
        } else if (degradeNum != 0) { // 不在降级状态直接重置degradeNum,因为不连续了
            degradeNum = 0;
        }
    } else { // 失败
        if (degradeNum < degradeCheckAllowTimes) { // 还没降级
            // 增加失败计数
            degradeNum++;
            if (degradeNum >= degradeCheckAllowTimes) { // 连续失败次数达到阈值 → 会启用降级模式,这个做个日志
                if (LOGGER.isWarnEnabled()) {
                    LOGGER.warn("the current global transaction has been automatically downgraded");
                }
            }
        } else if (reachNum != 0) { // 已降级,但之前有成功累计 → 重置成功次数
            reachNum = 0;
        }
    }
}

分布式事务的执行

@GlobalTransactional 的核心逻辑封装在TransactionalTemplate#execute方法中,其整体设计思路明显借鉴了 Spring 的事务处理模式。可以将其类比理解为:

  1. 解析注解信息,将 @GlobalTransactional 的属性组装为 TransactionInfo(类似 Spring 的 TransactionAttribute
  2. 基于该信息创建并绑定 GlobalTransaction,并应用对应的传播行为(相当于 Spring 的 TransactionStatus
  3. 执行业务逻辑,根据是否抛出异常决定调用 rollback 还是 commit
  4. 恢复执行上下文,确保支持 REQUIRES_NEWNOT_SUPPORTED 这类传播策略

​ 整体流程可以概括为:构建事务上下文 → 执行业务逻辑 → 结束事务 → 恢复现场,与 Spring 事务的核心思想是一致的,只是这里针对的是 分布式全局事务。我们先有个整体印象,然后再详细分析关键步骤的细节

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. 获取事务信息(事务传播、超时时间、隔离级别等)
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 获取当前线程绑定的全局事务(不存在则表示是初次,则为发起者,否则为参与者)
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        switch (propagation) {
            case NOT_SUPPORTED:
                if (existingTransaction(tx)) { // 存在事务就先暂停,以非全局事务来处理
                    suspendedResourcesHolder = tx.suspend(false);
                }
                return business.execute();
            case REQUIRES_NEW:
                // 存在事务就先暂停,开启一个新的全局事务
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend(false);
                }
                tx = GlobalTransactionContext.createNew();
                break;
            case SUPPORTS: 
                // 存在全局事务就加入,不存在就以非全局事务的方式执行
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                break;
            case REQUIRED:
                // 如果没有全局事务,则创建一个;否则使用当前事务
                tx = GlobalTransactionContext.getCurrentOrCreate();
                break;
            case NEVER:
                // 如果存在事务,直接抛异常
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                            String.format(
                                    "Existing transaction found for transaction marked with propagation 'never', xid = %s",
                                    tx.getXid()));
                } else {
                    return business.execute();
                }
            case MANDATORY:
                // 必须有全局事务,否则抛异常
                if (notExistingTransaction(tx)) {
                    throw new TransactionException(
                            "No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        // ======================= 进入事务逻辑 =======================
        try {
            // 利用TM开启全局事务(只有全局事务的发起者开可以开启),注册到server端并触发对应的的hook
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // 执行业务代码
                rs = business.execute();
                // =============== 往下走就算是二阶段的任务了,准备全局事务的提交或回滚 =====================
            } catch (Throwable ex) {
                // 业务异常触发全局事务回滚
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 业务执行成功 → 提交全局事务(只有事务发起者能提交)
            commitTransaction(tx, txInfo);

            return rs;
        } finally {
            // 全局事务配置(锁重试次数和锁重试时间间隔)回退(回退到上一个事务)或清除
            resumeGlobalLockConfig(previousConfig);
            // afterCompletion的钩子函数
            triggerAfterCompletion();
            // 钩子函数的clean
            cleanUp();
        }
    } finally {
        // 上一个事务的resume,很简单,重新绑定上一个xid就行
        if (suspendedResourcesHolder != null) {
            tx.resume(suspendedResourcesHolder);
        }
    }
}

XID

GlobalTransactionContext#getCurrent 用于获取当前全局事务信息,内部逻辑很简单:根据当前线程上下文是否存在 XID 来判断是 全局事务发起者 还是 参与者

XID 在整个分布式事务链路中起到类似 traceId 的作用,必须传递给下游服务,才能将多个微服务串联成一个完整的全局事务。Seata 的 integration 模块中实现了主流 RPC 框架的 XID 传播机制,例如 Dubbo 对应的 AlibabaDubboTransactionPropagationFilter

Launcher(全局事务发起者)

​ 作为全局事务的起点,如果线程上下文不存在 XID 时,才有资格创建新的全局事务。发起者负责开启全局事务,并在业务逻辑跑完后,根据是否抛异常来决定提交还是回滚

Participant(全局事务参与者)

​ 处于微服务调用链的中下层,参与到了其他上游服务发起的全局事务(这时线程上下文存在xid)。对应 Spring 的 加入已有事务传播行为。所以,全局事务的参与者不能触发全局事务的回滚或提交,它能做的就是通过 RM 报告当前分支事务的执行结果。如果执行失败,会把状态标记成 PhaseOne_Failed,等全局回滚的时候一起处理

全局事务开启

​ 全局事务是通过 GlobalTransaction#begin 来启动的。内部由TM构建一个 GlobalBeginRequest 请求发到 TC,TC 处理完成后会回传一个 XID,客户端拿到这个 XID 后绑定到当前线程上下文,用于后续分支事务的关联

​ server端的TC 收到请求后,最终会走到 DefaultCore#begin 方法,核心逻辑可以简单概括为:

  1. 创建 GlobalSession:初始化一个全局事务对象,并生成唯一的 XID

  2. 存储 GlobalSession

    ​ 通过不同的SessionLifecycleListener实现类来选择存储策略。以DataBaseSessionManager为例,会将GlobalSession转化为GlobalTransactionDO数据结构,并插入到global_table表中。至此全局事务信息完成持久化

  3. 回传 XID 给客户端

// DefaultCore#begin
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        // 创建globalSession
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
        MDC.put(RootContext.MDC_KEY_XID, session.getXid());
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

        // 开启session,根据不同的策略(file、db、redis)将session保存起来
        session.begin();

        // transaction start event
        MetricsPublisher.postSessionDoingEvent(session, false);

        return session.getXid();
    }

执行业务代码

​ 分布式事务要管的核心,其实就是数据库的读写操作。根据上一篇文章的分析,Seata通过代理DataSourceConnectionPreparedStatement来插入分布式事务逻辑。不过 DataSource 代理在事务期间主要是为了提供 ConnectionProxy,因此真正的重点在 ConnectionProxyPreparedStatementProxy,这两个才是实现核心逻辑的关键角色

PreparedStatementProxy

PreparedStatementProxy 在执行 SQL 时,会借助 Druid 等工具解析当前业务的 DML 语句,然后根据 SQL 类型选择对应的 AbstractDMLBaseExecutor 子类,并最终调用 Executor#execute 完成执行。SQL 类型和执行器映射关系大致如下:

  • INSERT → 不同的数据库有不同的实现,MYSQL为MySQLInsertExecutor:其beforeImage为空实现,此时数据都还没有的
  • UPDATE → UpdateExecutor最常用的一种Executor,前后镜像都要执行SELECT
  • DELETE → DeleteExecutor:其afterImage为空实现,因为数据都删了
  • SELECT_FOR_UPDATE → SelectForUpdateExecutor全局事务读已提交功能的核心实现
  • INSERT_ON_DUPLICATE_UPDATE → MYSQL为MySQLInsertOnDuplicateUpdateExecutor
  • UPDATE_JOIN → MYSQL为MySQLUpdateJoinExecutor
  • SELECTPlainExecutor:这种 SQL 不涉及数据修改,不会生成前后镜像,也不会构建 undo_log,直接执行原 SQL
AbstractDMLBaseExecutor

​ AbstractDMLBaseExecutor是Seata 处理 DML 语句(INSERT、UPDATE、DELETE)的核心抽象类,它定义了执行 SQL 前后获取镜像数据的模板方法

  • beforeImage():在SQL 之前,查询旧数据保存为TableRecords
  • afterImage():在SQL之后,查询新数据保存为TableRecords

这两份TableRecords是后续构建undo_log和lockKey基础:

  • undo_log全局事务回滚时用于执行反向补偿
  • lockKey:用于全局锁控制,避免不同的全局事务在结束前并发修改同一行数据

​ 其核心方法如下,execute()作为整体的入口,最终都会走到 executeAutoCommitFalse(),在这里完成 SQL 执行、镜像采集,以及 undo_log 和 lockKey 的生成

/**
 * 入口
 */
public T execute(Object... args) throws Throwable {
        String xid = RootContext.getXID();
        if (xid != null) { // 存在全局事务,将全局事务id设置到ConnectionProxy中
            statementProxy.getConnectionProxy().bind(xid);
        }

        // @GlobalLock注解支持
        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        return doExecute(args);
    }

public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getAutoCommit()) { // 当前事务为自动提交(代表没有主动开启事务)
        // 一般就是只用了分布式事务但没有使用本地事务的情况。
        // 这时每条dml都会被当作分支事务来执行,即执行完后都会操作seata的commit来注册分支事务(增加了rpc通信压力)
        return executeAutoCommitTrue(args);
    } else {
        return executeAutoCommitFalse(args);
    }
}

protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        // 执行所有SQL前修改autoCommit为false,即手动提交事务
        connectionProxy.changeAutoCommit();
        // 锁重试机制来commit
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            T result = executeAutoCommitFalse(args); // 走非自动提交事务的commit逻辑(准备undo日志)
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        // when exception occur in finally,this exception will lost, so just print it
        // here
        LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}

/**
 * 准备执行SQL了
 */
protected T executeAutoCommitFalse(Object[] args) throws Exception {
    try {
        // 1. 记录sql执行前镜像
        TableRecords beforeImage = beforeImage();
        // 2. 执行业务sql
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        // 3. 记录sql执行后的镜像
        TableRecords afterImage = afterImage(beforeImage);
        // 4. 准备undo_log(只是准备,还未写表的)
        prepareUndoLog(beforeImage, afterImage);
        return result;
    } catch (TableMetaException e) {
        LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
                e.getTableName(), e.getColumnName());
        statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
        throw e;
    }
}

/**
 * 准备undo_log数据
 */
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
    if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
        return;
    }
    // update语句的前后镜像Records数量一定要一样,Seata是不允许Update主键数据的
    if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
        if (beforeImage.getRows().size() != afterImage.getRows().size()) {
            throw new ShouldNeverHappenException(
                    "Before image size is not equaled to after image size, probably because you updated the primary keys.");
        }
    }
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

    // 准备先构建lockKey,可以映射到具体变化的某一行数据
    // DELETE作为删除操作,只有beforeImage可以定位到lockKey
    // 其余的update、insert都可以用afterImage来定位lockKey
    TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
    // 构建lock
    String lockKeys = buildLockKey(lockKeyRecords);
    if (null != lockKeys) {
        // 将lockKey和undo_log数据都先暂存起来

        connectionProxy.appendLockKey(lockKeys);

        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }
}


/**
 * 锁冲突回滚策略
 */
private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy {

    LockRetryPolicy(final ConnectionProxy connection) {
        super(connection);
    }

    @Override
    public <T> T execute(Callable<T> callable) throws Exception {
        if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {
            return doRetryOnLockConflict(callable);
        } else {
            return callable.call();
        }
    }

    /**
     * 出现其他异常,则操作origin connection进行真正的回滚
     */
    @Override
    protected void onException(Exception e) throws Exception {
        ConnectionContext context = connection.getContext();
        //UndoItems can't use the Set collection class to prevent ABA
        context.removeSavepoint(null);
        connection.getTargetConnection().rollback();
    }

    public static boolean isLockRetryPolicyBranchRollbackOnConflict() {
        return LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT;
    }
}
非本地事务执行机制

​ 注意在 AbstractDMLBaseExecutor#doExecute 方法里会把事务的自动提交改成手动提交,为什么呢?

​ 假设你在一个分布式事务里,只用了 Seata 的 @GlobalTransactional 注解,而没用 Spring 的 @Transactional 控制本地事务,其实 Seata 也是支持的

这种情况下,如果没有本地事务,JDBC 默认是 autoCommit=true,也就是每条 SQL 执行完就直接提交事务。问题是如果这样做,Seata 就没法保证 业务 SQL 和 undo_log 写入的原子性

​ 所以 Seata 在这里会先判断:如果当前连接是自动提交的,就把它改成手动提交(autoCommit=false)。这样能确保它先完成这几个步骤:

镜像 SQL 准备 → 业务 SQL 执行 → undo_log 构建 → 注册分支事务 → undo_log 落库
最后再统一 commit,这样业务 SQL 和 undo_log 写入就是一个原子操作,不会出问题

再说分支事务:

  • 如果代码里开启了本地事务,那么整个本地事务算一个分支事务,共享同一个 XID
  • 如果没开本地事务,那每一条 updatedeleteinsert 都会被 Seata 当成一个分支事务,由它来管全局一致性

所以只用 Seata,不开本地事务,也是可以的,但性能会差不少。为什么?因为每个分支事务都得跟 TC 通信、做数据库锁检查、写 undo_log,成本就变得很高了

ConnectionProxy

​ ConnectionProxy 是一阶段的核心控制器,它承载了分支事务的注册、锁冲突处理和重试、undo_log 落库以及事务提交/回滚等核心逻辑,我们依次来拆解一下这些功能,看看它们到底做了什么

public class ConnectionProxy extends AbstractConnectionProxy {

    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxy.class);

    private final ConnectionContext context = new ConnectionContext();

    private final LockRetryPolicy lockRetryPolicy = new LockRetryPolicy(this);

    private static final int REPORT_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(
            ConfigurationKeys.CLIENT_REPORT_RETRY_COUNT, DEFAULT_CLIENT_REPORT_RETRY_COUNT);

    /**
     * 默认false
     */
    public static final boolean IS_REPORT_SUCCESS_ENABLE = ConfigurationFactory.getInstance().getBoolean(
            ConfigurationKeys.CLIENT_REPORT_SUCCESS_ENABLE, DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE);

    public void checkLock(String lockKeys) throws SQLException {
        if (StringUtils.isBlank(lockKeys)) {
            return;
        }
        try {
            // 仅检查,不上锁
            boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,
                    getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
            if (!lockable) { // 锁冲突,直接抛LockConflictException让上层去处理
                throw new LockConflictException(String.format("get lock failed, lockKey: %s", lockKeys));
            }
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, lockKeys);
        }
    }

    public boolean lockQuery(String lockKeys) throws SQLException {
        boolean result = false;
        try {
            result = DefaultResourceManager.get().lockQuery(BranchType.AT, getDataSourceProxy().getResourceId(),
                    context.getXid(), lockKeys);
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, lockKeys);
        }
        return result;
    }

    /**
     * LockConflictException异常转换
     */
    private void recognizeLockKeyConflictException(TransactionException te) throws SQLException {
        recognizeLockKeyConflictException(te, null);
    }

    private void recognizeLockKeyConflictException(TransactionException te, String lockKeys) throws SQLException {
        if (te.getCode() == TransactionExceptionCode.LockKeyConflict
                || te.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
            StringBuilder reasonBuilder = new StringBuilder("get global lock fail, xid:");
            reasonBuilder.append(context.getXid());
            if (StringUtils.isNotBlank(lockKeys)) {
                reasonBuilder.append(", lockKeys:").append(lockKeys);
            }
            throw new LockConflictException(reasonBuilder.toString(), te.getCode());
        } else {
            // 转换为SQLException用于后续的回滚
            throw new SQLException(te);
        }

    }

    /**
     * 提交事务
     */
   @Override
    public void commit() throws SQLException {
        try {
            lockRetryPolicy.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { 
                // 说明是Spring主动开启了事务,这里才操作回滚
                // 因为如果是Seata帮我们开启的本地事务(即设置autoCommit=false),会在AbstractDMLBaseExecutor$LockRetryPolicy#onException里进行回滚
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

    private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit(); // 会注册分支事务到server端,并对修改的数据上锁(所以,至少会操作server端的两张表,branch_session和lock_table表)
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks(); // 速度快,因为仅对修改的数据进行锁检查(不上锁)。适合不需要使用事务但不能脏读(读到其他全局事务未整体提交但分支事务已提交的场景)的场景使用
        } else {
            targetConnection.commit();
        }
    }

      private void processGlobalTransactionCommit() throws SQLException {
        try {
            // 注册分支事务,对当前分支事务修改的数据进行上锁(netty通信,server端上锁到lock_table表)
            register();
        } catch (TransactionException e) { // 锁冲突异常判断(发生了锁冲突会重新抛出LockConflictException)
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            // 刷新undo_log到数据库
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            // 当前分支事务真正提交
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }

    private void processLocalCommitWithGlobalLocks() throws SQLException {
        // 检查锁资源,存在其他事务上锁了就直接抛LockConflictException异常,进行锁重试
        checkLock(context.buildLockKeys());
        try {
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }

  

    private void register() throws TransactionException {
        // undo_log或lockKey不存在,表示当前事务没有数据修改(可能只有select操作),也就不需要注册为分支事务了
        if (!context.hasUndoLog() || !context.hasLockKey()) {
            return;
        }

        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
                null, context.getXid(), context.getApplicationData(),
                context.buildLockKeys());
        context.setBranchId(branchId);
    }

    @Override
    public void rollback() throws SQLException {
        targetConnection.rollback();
        if (context.inGlobalTransaction() && context.isBranchRegistered()) { // 身在全局事务中且当前事务已注册为分支事务才有资格report
            report(false);
        }
        context.reset();
    }

    /**
     * change connection autoCommit to false by seata
     * <p/>
     * 修改当前Connection为非自动提交事务
     *
     * @throws SQLException the sql exception
     */
    public void changeAutoCommit() throws SQLException {
        getContext().setAutoCommitChanged(true);
        setAutoCommit(false);
    }

    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
        if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {
            // change autocommit from false to true, we should commit() first according to
            // JDBC spec.
            doCommit();
        }
        targetConnection.setAutoCommit(autoCommit);
    }

     /**
     * 根据commitDone向TC报告分支事务的状态
     * TC也仅修改分支事务的状态,不会做回滚操作
     */
    private void report(boolean commitDone) throws SQLException {
        if (context.getBranchId() == null) {
            return;
        }
        int retry = REPORT_RETRY_COUNT;
        while (retry > 0) {
            try {
                DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                        commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
                return;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
                        + commitDone + "] Retry Countdown: " + retry);
                retry--;

                if (retry == 0) {
                    throw new SQLException("Failed to report branch status " + commitDone, ex);
                }
            }
        }
    }

     /**
     * LockRetryPolicy:用于处理锁冲突时的重试策略。
     *
     * 在分布式事务场景下,数据写操作可能因锁冲突而失败,此类提供重试机制来提升成功率。
     */
    public static class LockRetryPolicy {
        /**
         * 是否开启 锁冲突后分支事务回滚 策略
         * 如果为 true,则表示在锁冲突时直接回滚分支事务,而不再进行多次重试。
         */
        protected static final boolean LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = ConfigurationFactory
            .getInstance().getBoolean(ConfigurationKeys.CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT, DEFAULT_CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT);

        protected final ConnectionProxy connection;

        public LockRetryPolicy(ConnectionProxy connection) {
            this.connection = connection;
        }

        public <T> T execute(Callable<T> callable) throws Exception {
            /*
            不需要重试的机制如下:
                1、开启了锁冲突回滚策略
                2、当前Connection的自动提交改变过(由true变为false),这种情况已经单独处理了,就不需要再重试了
             */
            if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT && connection.getContext().isAutoCommitChanged()) {
                return callable.call();
            } else {
                // 重试
                return doRetryOnLockConflict(callable);
            }
        }

        /**
         * 锁冲突重试
         */
        protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
            LockRetryController lockRetryController = new LockRetryController();
            while (true) {
                try {
                    return callable.call();
                } catch (LockConflictException lockConflict) { // 所冲突异常捕获
                    onException(lockConflict);
                    if (connection.getContext().isAutoCommitChanged()
                        && lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
                        lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);
                    }
                    // sleep后重试,默认重试30次,间隔时间为10毫秒
                    lockRetryController.sleep(lockConflict);
                } catch (Exception e) {
                    // 其他非锁冲突异常,不重试,直接抛出
                    onException(e);
                    throw e;
                }
            }
        }

        protected void onException(Exception e) throws Exception {
        }
    }
}
注册分支事务和锁检查

​ 在 ConnectionProxy#register 方法里,会通过 RM 的 branchRegister 向 TC 发送一条 BranchRegisterRequest 消息,请求注册分支事务。TC 端的处理逻辑主要在 AbstractCore#branchRegister 里,代码很多我就不贴了,直接说流程:

  1. 将 BranchRegisterRequest 转成 BranchSession 对象
  2. 尝试加锁(AT 模式才需要)
    • 会检查锁并调用 LockStoreDataBaseDAO#acquireLock 去操作 lock_table
  3. 加锁逻辑
    • 查询 lock_table 是否已有对应的 row_key,即行锁(每个 row_key 唯一标识数据库某张表的某行)
      • 没查到:说明没人占,直接插入 lock_table,加锁成功。
      • 查到了,且 xid 一致:说明是当前全局事务的重复加锁,只需补充缺失的锁记录,依然算成功
      • 查到了,xid 不一致:说明被别的全局事务占用,返回加锁失败,直接抛 LockKeyConflict 异常(非常重要,客户端会靠它判断是否重试)
  4. 加锁成功后的处理
    • 把 BranchSession 转成 BranchTransactionDO
    • 通过 SessionLifecycleListener 持久化(db 模式会写入 branch_table
    • 返回 branchId 给客户端
  5. 加锁失败
    • 抛出 BranchTransactionException,异常码 LockKeyConflict
    • 由 AbstractCallback 捕获并转成响应消息,发回客户端
锁冲突重试

​ 当 ConnectionProxy#recognizeLockKeyConflictException 方法发现是 LockKeyConflict 异常时,会再包装成 LockConflictException 抛出去。接着,LockRetryPolicy 会接手,按重试策略继续尝试执行 SQL。默认策略是:重试 30 次,每次间隔 10ms(这些值受 GlobalLockConfig配置影响,可以调整)

​ 如果 30 次重试之后还是抢不到锁,说明冲突一直没解决,那就直接触发 本地事务回滚,整个业务 SQL 失败收场

刷新undo_log

​ 分支事务注册成功后,就可以把之前暂存的 Undo Log 真正写进数据库的 undo_log 表,确保如果二阶段回滚发生,可以通过它恢复数据

本地事务真正提交

​ 该做的事都做完了:分支事务注册搞定、Undo Log 入库也搞定,这时候就可以安全地提交本地事务了

rollback

​ 一阶段的回滚逻辑很简单,核心就两件事:

  • 回滚本地事务
  • 调用 RM → TC 的 branchReport 方法,把分支异常状态上报。TC 接到后只是把对应分支事务的状态改成 PhaseOne_Failed,并不会触发全局回滚

一阶段简单总结

​ 一阶段的核心,就是把 undo_log 准备好,并保证 业务 SQL + undo_log + 分支事务注册 的原子性,这样二阶段的全局提交或回滚才能顺利进行。二阶段的具体操作,我们留到下一篇再详细讲

相关链接