Seata — AT模式二阶段全解析

​ 本文深入解析了 Seata AT 模式下全局事务的提交与回滚机制,详细讲解了 GlobalTransaction 提交全局事务分支事务的异步提交以及 全局回滚的执行流程。重点探讨了 事务协调器(TC)和资源管理器(RM)如何协作完成事务提交与回滚。特别是在 并发控制与数据一致性 问题上,展示了 Seata 如何通过 UndoLog校验 机制避免“脏写”问题,并实现强一致性的全局事务回滚

前置文章

全局事务的提交

​ 当业务正常执行完,没有抛异常时,就会进入 GlobalTransaction#commit 方法,开始全局事务的提交。在这个方法里,有两个重点要注意:

  • 参与者不能主动提交事务:只有全局事务发起者才能触发全局提交
  • 提交失败会重试:如果全局提交失败,Seata 默认会自动重试 5 次,尽量保证提交能成功完成
public void commit() throws TransactionException {
    if (role == GlobalTransactionRole.Participant) {
        // 全局事物的参与者 是不能提交的
        return;
    }
    assertXIDNotNull();
    // 事务提交失败的重试次数:默认5次
    int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
    try {
        while (retry > 0) {
            try {
                retry--;
                status = transactionManager.commit(xid);
                break;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(),
                        retry, ex.getMessage());
                // 不能再重试了,才真正抛出异常
                if (retry == 0) {
                    throw new TransactionException("Failed to report global commit", ex);
                }
            }
        }
    } finally {
        if (xid.equals(RootContext.getXID())) {
            suspend(true);
        }
    }

}

TC处理commit

​ 最终提交动作由 TM 通知 TC 完成,TransactionManager#commit 会发一个 GlobalCommitRequest 给 TC,由 TC 来负责全局事务的提交。

​ 回顾前面的流程,分支事务在注册后,本地数据其实已经提交并落库,undo_log 也没用了。所以客户端提交时,主要任务就是清理 undo_log

​ 服务端这边涉及 global_tablebranch_tablelock_table 三类数据。真正影响并发的是 lock_table,因为它占用全局锁,可能阻塞其他全局事务,所以它必须同步删除。而 undo_logglobal_tablebranch_table 这些不会影响其他事务,可以异步清理

​ 换句话说,Seata 二阶段的大部分操作是异步的,只保留必要的同步动作(删锁),这也是它性能快的原因之一

DefaultCore#commit

​ 服务端TC处理提交在io.seata.server.coordinator.DefaultCore#commit方法里,分析一下它的重点流程

  1. 幂等控制

    只有全局事务状态是 GlobalStatus.Begin 才允许提交,避免重复操作

  2. GlobalSession的关闭和释放

    调用 globalSession.closeAndClean(),内部通过 LockManager#releaseGlobalSessionLock 删除对应 xidlock_table 数据,释放占用的全局锁资源

  3. 提交逻辑

    • 异步处理

      ​ AT模式支持异步提交,asyncCommit()方法仅将GlobalTransactionDO的状态修改为AsyncCommitting,由定时任务去拉取这个状态的数据再处理后续的操作

    • 同步处理

      如果不能异步(比如 TCC、SAGA 模式),则走 doGlobalCommit() 立即提交

  4. 返回结果

    • 异步提交:直接返回 Committed,对外表示事务结束

    • 同步提交:根据 doGlobalCommit() 的结果返回实际状态

public GlobalStatus commit(String xid) throws TransactionException {
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) { // 没找到,说明全局事务已经结束被删除了
        return GlobalStatus.Finished;
    }

    if (globalSession.isTimeout()) {
        return GlobalStatus.TimeoutRollbacking;
    }

    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

    // 加锁处理,避免并发修改
    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
        if (globalSession.getStatus() == GlobalStatus.Begin) { // 初次提交

            // 先关闭 GlobalSession,不再允许注册分支事务。并释放分支事务占用的锁资源
            globalSession.closeAndClean();
            if (globalSession.canBeCommittedAsync()) { // 异步提交(AT),交给定时任务线程池去做
                globalSession.asyncCommit();// 将GlobalTransactionDO的状态修改为AsyncCommitting,由定时任务去处理
                MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
                return false;
            } else { // 不能异步处理,返回true马上提交
                globalSession.changeGlobalStatus(GlobalStatus.Committing);
                return true;
            }
        }
        return false;
    });

    if (shouldCommit) {
        // 执行同步全局提交
        boolean success = doGlobalCommit(globalSession, false);
        // 分支事务没处理完但支持异步,还是改为异步状态以支持异步处理
        if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
            globalSession.asyncCommit();
            return GlobalStatus.Committed;
        } else {
            return globalSession.getStatus();
        }
    } else {
        // AsyncCommitting状态的直接返回Committed,表示完结
        return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
    }
}

异步处理真正的提交

​ Server 启动时,DefaultCoordinator#init 会注册一个定时任务 handleAsyncCommitting(),默认每隔 1 秒执行一次。任务的逻辑是:

  1. 从数据库拉取 状态为 AsyncCommitting 的 GlobalTransactionDO,并将其转为 GlobalSession
  2. 对每个 GlobalSession 调用 DefaultCore#doGlobalCommit 完成提交操作
  3. doGlobalCommit 内部会遍历关联的分支事务,并逐一处理:
    • 分支状态为 PhaseOne_Failed:表示一阶段已经回滚,但全局事务仍然执行了 commit,这种情况下只会删除该分支,不会再触发回滚。
    • 通知 RM 执行分支事务的提交逻辑(具体逻辑后续再介绍)
    • 根据 RM 返回结果处理:
      • 成功:删除对应的 BranchTransactionDO(即 branch_table 数据)
      • 失败:后续可能重试,直到达到超时策略
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;
    // 提交开始,记录指标
    MetricsPublisher.postSessionDoingEvent(globalSession, retrying);

    if (globalSession.isSaga()) {
        success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
    } else {
        // 遍历分支事务,逐个提交
        Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
            // 非重试时,跳过可异步提交的分支(比如 AT 模式),交给异步线程处理
            if (!retrying && branchSession.canBeCommittedAsync()) {
                return CONTINUE;
            }

            BranchStatus currentStatus = branchSession.getStatus();
            if (currentStatus == BranchStatus.PhaseOne_Failed) {
                /**
                 * 说明该分支在一阶段执行失败,并且回滚了本地事务
                 * 此时全局事务却进入提交流程,意味着发起方未感知到异常
                 * 
                 * 风险:如果发起方不回滚全局事务,将破坏全局一致性
                 * 当前处理:直接删除这个分支(因为它已回滚,不影响数据)
                 */
                SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                return CONTINUE;
            }
            try {
                // 通知客户端的RM处理其分支事务的提交操作
                BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession,
                        branchSession);
                if (isXaerNotaTimeout(globalSession, branchStatus)) {
                    branchStatus = BranchStatus.PhaseTwo_Committed;
                }
                switch (branchStatus) {
                    case PhaseTwo_Committed: // commit成功,直接删除这个BranchTransactionDO数据
                        SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                        return CONTINUE;
                    case PhaseTwo_CommitFailed_Unretryable:
                        // 分支提交失败,且不可重试,标记全局事务失败
                        SessionHelper.endCommitFailed(globalSession, retrying);
                        return false;

                    default:
                        if (!retrying) {
                            globalSession.queueToRetryCommit();
                            return false;
                        }
                        if (globalSession.canBeCommittedAsync()) {
                            return CONTINUE;
                        } else {
                            return false;
                        }
                }
            } catch (Exception ex) {
                StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                        new String[] { branchSession.toString() });
                if (!retrying) {
                    globalSession.queueToRetryCommit();
                    throw new TransactionException(ex);
                }
            }
            return CONTINUE;
        });
        // 如果 result != null,说明提前返回(遇到失败或特定条件)
        if (result != null) {
            return result;
        }
        // 还有分支事务且不能异步提交 -> 本次提交未完成,需重试
        if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
            LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
            return false;
        }
    }
    // 提交完成,所有关联的BranchTransactionDO数据都已被删除
    if (success && globalSession.getBranchSessions().isEmpty()) {
        if (!retrying) {
            globalSession.setStatus(GlobalStatus.Committed);
        }
        SessionHelper.endCommitted(globalSession, retrying);
    }
    return success;
}

RM处理commit

​ 当全局事务提交时,TC 会为其下的每个分支事务调用 AbstractCore#branchCommit 方法,并发送 BranchCommitRequest 请求给对应的 RM
​ 每个 BranchTransactionDO 都包含 resourceId(数据库标识)和 clientId(客户端标识),TC 通过 ChannelManager#getChannel 找到对应客户端的 Channel,实现定向消息发送

​ RM 收到请求后,会进入 ResourceManagerInbound#branchCommit,将分支事务封装为 Phase2Context 并投递到 AsyncWorker#commitQueue,等待定时任务异步处理。因此,RM 在收到消息后几乎立即返回,实际的 undo_log 删除操作是异步完成的

AsyncWorker#doBranchCommit

  • resourceId 分组,确保同一数据库复用同一 Connection
  • 通过资源管理器拿到数据源代理,再获取原始 Connection(避免再走代理,防止重新生成 undo_log
  • 执行批量删除对应 undo_log,完成分支事务提交
private void doBranchCommit() {
    if (commitQueue.isEmpty()) {
        return;
    }

    List<Phase2Context> allContexts = new LinkedList<>();
    // 倒出来
    commitQueue.drainTo(allContexts);

    // 按resourceId分组
    // resourceId标识具体的某个数据库。分组后数据库的所有操作可以在一个Connection里处理
    Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);

    groupedContexts.forEach(this::dealWithGroupedContexts);
}

private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
    if (StringUtils.isBlank(resourceId)) {
        return;
    }
    // 根据 resourceId 获取 DataSourceProxy
    DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
    if (dataSourceProxy == null) {
        addAllToCommitQueue(contexts);
        return;
    }

    Connection conn = null;
    try {
        // 获取真实数据库连接(非代理连接)
        conn = dataSourceProxy.getPlainConnection();
        // 根据数据库类型获取对应的 UndoLogManager 实现
        UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());

        // 数据分片,默认1000
        List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);
        for (List<Phase2Context> partition : splitByLimit) {
            // 按批删除 undo_log
            deleteUndoLog(conn, undoLogManager, partition);
        }
    } catch (SQLException sqlExx) {
        addAllToCommitQueue(contexts);
    } finally {
        IOUtil.close(conn);
    }

}

总结

​ 通过如上的详细分析,总结下二阶段Commit的核心流程

TC(事务协调器)

  • 接收到全局事务commit请求后,立即删除全局锁(LockDO)记录,以便资源尽快释放。
  • 然后异步处理每个分支事务的提交
    • 通过branchCommit请求通知对应的RM
    • 等待分支提交完成后,再删除对应的BranchTransactionDOGlobalTransactionDO,更新全局事务状态为Committed

RM(资源管理器)

  • 收到BranchCommitRequest后,快速返回(不阻塞TC)
  • 将提交任务封装成Phase2Context,投递到AsyncWorker.commitQueue异步执行undo_log清理
    • 根据resourceId分组复用数据库连接,减少开销。
    • 获取真实数据库连接(绕过代理,避免再次生成undo_log),批量删除对应undo_log,确保 AT 模式下的全局事务状态和数据状态最终一致

全局事务的回滚

​ 当业务执行过程中抛出异常,并命中回滚条件时,会进入 DefaultGlobalTransaction#rollback,启动全局事务回滚流程。和提交流程类似,参与者不能主动触发回滚,回滚操作由全局事务发起者控制,并带有默认重试机制,确保尽量完成回滚

​ 这也是分布式事务的核心价值所在:回滚依赖一阶段生成的 undo_log 进行反向补偿,保证整个微服务链中各应用的数据状态在同一时刻保持一致。不同于消息队列的最终一致性,AT 模式强调强一致性,因此 rollback 必须同步执行,不能异步处理

TC处理rollback

​ 由 TM 通知 TC 完成,TransactionManager#rollback 会发一个 GlobalRollbackRequest 给 TC,由 TC 来负责全局事务的回滚

​ TC 在 DefaultCore#doGlobalRollback 中处理 rollback,核心逻辑如下:

  1. 获取全局事务关联的所有分支事务 BranchSession逆序遍历,保证依赖顺序正确
  2. 每个 BranchSession 的处理逻辑
    1. PhaseOne_Failed:一阶段已经失败,本地事务已回滚,无需再操作,直接删除该分支
    2. 通知客户端的RM处理当前BranchSession的回滚操作
    3. 根据其RM的返回结果来处理
      • PhaseTwo_Rollbacked:回滚成功,删除对应的 BranchTransactionDO
      • PhaseTwo_RollbackFailed_Unretryable:回滚失败不可重试(客户端出现了SQLUndoDirtyException异常,数据被其他事务并发修改过),整体回滚标记失败,退出循环
      • 其他状态:将全局事务状态标记为 RollbackRetrying,等待定时任务重试,并结束当前循环
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
    boolean success = true;
    // start rollback event
    MetricsPublisher.postSessionDoingEvent(globalSession, retrying);

    if (globalSession.isSaga()) {
        success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);
    } else {
        // AT 模式:遍历全局事务的分支事务,并逆序回滚(保证事务依赖顺序)
        Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {
            BranchStatus currentBranchStatus = branchSession.getStatus();
            if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                // 分支事务一阶段就失败了(PhaseOne_Failed),说明已经回滚过,无需再次处理
                SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                return CONTINUE;
            }
            try {
                // 通知客户端的RM处理其分支事务的回滚操作
                BranchStatus branchStatus = branchRollback(globalSession, branchSession);
                if (isXaerNotaTimeout(globalSession, branchStatus)) {
                    LOGGER.info("Rollback branch XAER_NOTA retry timeout, xid = {} branchId = {}",
                            globalSession.getXid(), branchSession.getBranchId());
                    branchStatus = BranchStatus.PhaseTwo_Rollbacked;
                }
                // 根据RM处理的结果来决定是否继续
                switch (branchStatus) {
                    case PhaseTwo_Rollbacked:
                        // 当前分支回滚成功,删除数据库branch_table中对应的记录
                        SessionHelper.removeBranch(globalSession, branchSession, !retrying);
                        return CONTINUE;
                    case PhaseTwo_RollbackFailed_Unretryable:
                        // 分支回滚失败且不可重试:标记全局事务回滚失败
                        // 这种情况为客户端前后镜像验证失败,可能在此期间需要回滚的数据被修改过(客户端产生了SQLUndoDirtyException异常)
                        SessionHelper.endRollbackFailed(globalSession, retrying);
                        // 返回false则表示前面还未处理的分支事务不会再回滚了
                        return false;
                    default:
                        // 分支回滚失败但可重试:将任务加入重试队列
                        if (!retrying) {
                            globalSession.queueToRetryRollback();
                        }
                        return false;
                }
            } catch (Exception ex) {
                if (!retrying) {
                    globalSession.queueToRetryRollback();
                }
                throw new TransactionException(ex);
            }
        });
        if (result != null) { // result 不为 null,说明某个分支回滚失败或特定条件触发提前返回
            return result;
        }
    }

    // AT 模式下全局事务回滚完成
    if (success) {
        SessionHelper.endRollbacked(globalSession, retrying);
        LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());
    }
    return success;
}

RM处理rollback

​ RM 执行回滚逻辑时,核心入口是 DataSourceManager#branchRollback,其内部会根据数据库类型选择对应的 UndoLogManager,最终调用 UndoLogManager#undo 完成回滚操作

UndoLogManager#undo

一个分支事务可能包含多条需要回滚的数据记录,每条对应一个 SQLUndoLog。核心流程如下:

  • 查询 Undo 日志:根据 xidbranchId 查询当前分支事务对应的 undo_log,并解析为 SQLUndoLog 列表。
  • 开启本地事务: 在本地事务中执行所有回滚操作,保证原子性
  • 反向逐条回滚:遍历 SQLUndoLog为每条生成对应的 UndoExecutor,并执行反向 SQL 恢复数据
  • 收尾处理
    • 存在undo_log所有回滚 SQL 执行完毕后,删除 undo_log 记录并提交事务
    • 不存在undo_log:可能是分支一阶段提交超时导致全局回滚,此时插入一条状态为 GlobalFinished 的空记录,阻止其一阶段的提交
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    ConnectionProxy connectionProxy = null;
    Connection conn = null;
    ResultSet rs = null;
    PreparedStatement selectPST = null;
    boolean originalAutoCommit = true;

    // 无限重试,直到成功
    for (;;) {
        try {
            connectionProxy = dataSourceProxy.getConnection();
            conn = connectionProxy.getTargetConnection();

             // 开启事务来执行undo_log的回滚
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }

            // 查询 undo_log,并加行级锁(FOR UPDATE),防止并发回滚冲突
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();

            boolean exists = false;
            while (rs.next()) {
                exists = true;

                int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                // 状态校验,确保Undo_log可回滚
                if (!canUndo(state)) {
                    return;
                }

                // 获取 undo_log 反序列化解析出来
                String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                Map<String, String> context = parseContext(contextString);
                byte[] rollbackInfo = getRollbackInfo(rs);

                String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                try {
                    setCurrentSerializer(parser.getName());
                    // 逆序执行 SQLUndoLog(防止依赖问题)
                    List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                    if (sqlUndoLogs.size() > 1) {
                        Collections.reverse(sqlUndoLogs);
                    }
                    // 依次对每条SQLUndoLog创建对应数据库类型的 UndoExecutor 并执行回滚操作
                    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                        TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType())
                                .getTableMeta(
                                        conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                        sqlUndoLog.setTableMeta(tableMeta);
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(), sqlUndoLog);
                        undoExecutor.executeOn(connectionProxy);
                    }
                } finally {
                    // remove serializer name
                    removeCurrentSerializer();
                }
            }

            if (exists) {
                // 回滚完就删除undo_log
                deleteUndoLog(xid, branchId, conn);
                conn.commit();
            } else {
                // 如果 undo_log 不存在,说明已回滚过,插入 GlobalFinished 防止重复回滚
                insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                conn.commit();
            }
            return;
        } catch (SQLIntegrityConstraintViolationException e) {
        } catch (Throwable e) {
            // Connection回滚并抛出异常

        } finally {
            // close相关操作
        }
    }
}

AbstractUndoExecutor

​ 这个类是生成 undo_log 对应的反向补偿 SQL 并执行的基类。它本身定义了模板方法 buildUndoSQL(),由各个具体子类去实现具体的回滚 SQL 构建逻辑。重点看看它的 dataValidationAndGoOn 校验方法

​ 默认情况下,IS_UNDO_DATA_VALIDATION_ENABLE = true,也就是说每条 SQLUndoLog 在执行反向 SQL 前都会先走这个校验逻辑,核心目的就是防止脏回滚,保证数据安全

dataValidationAndGoOn的校验规则总结如下:

  • beforeImage 和 afterImage 一致
    说明这条数据在事务里根本没变,回滚也没意义,直接返回 false,跳过补偿
  • 最新数据和 afterImage 不一致
    说明数据在当前分布式事务还没回滚前,被其他线程或事务改过,这时候要分情况:
    1. 最新数据和 beforeImage 一致
      说明回滚其实已经完成了,那就不用再补偿,直接跳过
    2. 最新数据既不等于 beforeImage,也不等于 afterImage
      说明这条记录被其他非全局事务修改过,而且没有加 @GlobalLock。这时候 Seata 直接抛 SQLUndoDirtyException,终止整个全局回滚,避免把别人提交的数据误删。

​ 这里的风险点在于:非全局事务(没有开启分布式事务控制)是能直接修改那些处于全局事务中的未提交数据,这就是“脏写”问题。为了避免这种情况,如果某个业务不想用分布式事务,但又有并发写的可能,就可以用 @GlobalLock 给修改的数据加全局锁,从而防止并发修改。当然,你要是想无视这个风险,简单粗暴的将IS_UNDO_DATA_VALIDATION_ENABLE配置为false,关掉校验就行

public abstract class AbstractUndoExecutor {
    // 是否开启undo_log的镜像前后校验,默认为true
    public static final boolean IS_UNDO_DATA_VALIDATION_ENABLE = ConfigurationFactory.getInstance()
            .getBoolean(ConfigurationKeys.TRANSACTION_UNDO_DATA_VALIDATION, DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION);

    protected boolean dataValidationAndGoOn(ConnectionProxy conn) throws SQLException {

        TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
        TableRecords afterRecords = sqlUndoLog.getAfterImage();

        // DataCompareUtils#isRecordsEquals方法用来比较两个TableRecords是否逻辑一致(即对应行的字段数据是否一致)
        // 判断 beforeImage 和 afterImage 是否一致
        Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);
        if (beforeEqualsAfterResult.getResult()) {
            // 镜像相同,直接返回false,不用执行回滚操作了
            return false;
        }

        // 查询当前数据库中最新的数据
        TableRecords currentRecords = queryCurrentRecords(conn);
        Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);
        if (!afterEqualsCurrentResult.getResult()) { // 如果当前数据 不等于 afterImage,则说明数据可能被修改过,进一步判断是否回到 beforeImage 状态

            Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);

            if (beforeEqualsCurrentResult.getResult()) { // 当前数据已经回到 beforeImage,说明回滚已经完成或本地事务回滚成功,无需再做操作
                return false;
            } else {
                // 产生脏数据了:当前数据既不等于 afterImage,也不等于beforeImage
                // 说明被其他非全局事务修改过(其他非全局事务没用@GlobalLock)
               
                // 脏数据异常,无法安全回滚。TC会终止全局回滚
                throw new SQLUndoDirtyException("Has dirty records when undo.");
            }
        }
        return true;
    }
}

总结

​ 一句话总结:整个全局事务的回滚就是同步的依次将每个分支事务的所有UndoLog数据分别解析为反向补偿SQL并执行(insert变delete、delete变insert、update还是update)