Spring-Transactional


​ 从源码分析了@Transactional的解析和切面使用流程。Spring事物如此复杂是因为不同的PROPAGATION有不同的策略,特别是在嵌套的流程中,所以重点分析了PlatformTransactionManager的事物获取流程,深入了解了PROPAGATION的实现

@EnableTransactionManagement

​ 开启Spring基于注解的事务管理,其背后主要通过导入一个基础配置类 ProxyTransactionManagementConfiguration 来实现核心功能。

该配置类定义并注册了事务功能所依赖的三个关键 Bean:

Bean 名称 作用概述
TransactionAttributeSource 用于解析方法或类上的 @Transactional 注解,提取事务属性(如传播行为、回滚规则等)。默认实现为 AnnotationTransactionAttributeSource
TransactionInterceptor 真正的事务拦截器,在目标方法执行前后进行事务管理操作(开启、提交、回滚等)。本质上是一个 MethodInterceptor
BeanFactoryTransactionAttributeSourceAdvisor 是 Spring AOP 中的 Advisor,封装了 TransactionAttributeSourceTransactionInterceptor,通过切点判断哪些方法需要事务,并将拦截器应用到这些方法上。

BeanFactoryTransactionAttributeSourceAdvisor

​ 是一个PointcutAdvisor,既可以解析方法和类的注解事物信息,也能将真正的拦截器应用到代理中,实现注解事物的支持。

TransactionAttributeSourcePointcut

TransactionAttributeSourcePointcut 是 Spring 声明式事务机制中的关键组件之一,它继承自 StaticMethodMatcherPointcut,这是一个仅关注方法匹配、对类无过滤限制的 Pointcut 实现

为什么这么设计?因为在事务注解的处理逻辑中

  • 是否应用事务主要取决于方法级别是否存在 @Transactional 注解
  • 方法上无注解,才会回退查找类级别上的事务配置
  • 因此,从设计上讲,应该保留对所有类的匹配资格,不能在 ClassFilter 层面就提前“判死刑”

换句话说,Spring 不会直接排除某些类,而是逐个方法检查是否具备事务属性,从而实现最大化的灵活性和兼容性。

public abstract class StaticMethodMatcherPointcut extends StaticMethodMatcher implements Pointcut {

    // 始终匹配为true的ClassFilter
    private ClassFilter classFilter = ClassFilter.TRUE;

    public void setClassFilter(ClassFilter classFilter) {
        this.classFilter = classFilter;
    }

    @Override
    public ClassFilter getClassFilter() {
        return this.classFilter;
    }

    @Override
    public final MethodMatcher getMethodMatcher() {
        return this;
    }

}

方法解析


public boolean matches(Method method, Class<?> targetClass) {
    // === 排除掉一些不需要事务增强的内部类 ===
    // 例如事务代理自身、事务管理器、异常翻译器等,避免不必要的切面增强
    if (TransactionalProxy.class.isAssignableFrom(targetClass) ||
        PlatformTransactionManager.class.isAssignableFrom(targetClass) ||
        PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) {
        return false;
    }

    // === 通过 TransactionAttributeSource 判断该方法是否需要事务处理 ===
    // 如果解析不到事务属性(即无 @Transactional 注解),则不匹配
    TransactionAttributeSource tas = getTransactionAttributeSource();
    return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

TransactionAttributeSource

TransactionAttributeSource 是 Spring 中用于解析方法/类上的事务注解的核心接口,Spring 的默认实现为 **AnnotationTransactionAttributeSource**。

它的作用是从方法或类中提取事务元信息,并将其封装为一个 TransactionAttribute 对象,供后续事务切面使用。

支持的注解类型包括:

  • Spring 自身的 @org.springframework.transaction.annotation.Transactional
  • Java 标准的 @javax.transaction.Transactional
  • EJB 标准的 @javax.ejb.TransactionAttribute

​ Spring 的 @Transactional 注解最终会被解析为一个 RuleBasedTransactionAttribute 对象,该对象包含了传播行为、隔离级别、超时、只读等完整事务配置。

解析优先级

​ 在执行方法事务增强前,Spring 会根据如下顺序查找事务注解,一旦某一层找到了,就立即返回,停止继续向上查找

  1. 目标类中“具体实现方法”上的注解
  2. 目标类上(Class级别)的注解
  3. 接口中定义的“方法”上的注解
  4. 接口自身(Class级别)的注解

关键源码

public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSource
        implements Serializable {
    // javax.transaction.Transactional注解支持
    private static final boolean jta12Present;

    // javax.ejb.TransactionAttribute注解支持
    private static final boolean ejb3Present;

    static {
        ClassLoader classLoader = AnnotationTransactionAttributeSource.class.getClassLoader();
        jta12Present = ClassUtils.isPresent("javax.transaction.Transactional", classLoader);
        ejb3Present = ClassUtils.isPresent("javax.ejb.TransactionAttribute", classLoader);
    }
    // 是否只代理public方法,默认为true
    private final boolean publicMethodsOnly;

    // 事物解析器,顺序有优先级
    // 1. SpringTransactionAnnotationParser (这个是一定会有的)
    // 2. JtaTransactionAnnotationParser(需要有javax.transaction.Transactional注解)
    // 3. Ejb3TransactionAnnotationParser(需要有javax.ejb.TransactionAttribute注解)
    private final Set<TransactionAnnotationParser> annotationParsers;

    public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
        if (method.getDeclaringClass() == Object.class) {
            return null;
        }

        // 先看看有没有缓存
        Object cacheKey = getCacheKey(method, targetClass);
        TransactionAttribute cached = this.attributeCache.get(cacheKey);
        if (cached != null) {
            // NULL_TRANSACTION_ATTRIBUTE表示无事物
            if (cached == NULL_TRANSACTION_ATTRIBUTE) {
                return null;
            } else {
                return cached;
            }
        } else {
            // 事物信息查找
            TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
            // 缓存并返回
            if (txAttr == null) {
                this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
            } else {
                String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
                if (txAttr instanceof DefaultTransactionAttribute) {
                    ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
                }
                if (logger.isTraceEnabled()) {
                    logger.trace(
                            "Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
                }
                this.attributeCache.put(cacheKey, txAttr);
            }
            return txAttr;
        }
    }

    protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
        // 默认非public方法不支持
        if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
            return null;
        }
        // 获取最具体的方法(桥接方法处理),确保是目标类中的实现方法
        Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

        // 先解析方法上的解析事务注解
        TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
        if (txAttr != null) {
            return txAttr;
        }

        // 方法上没有,在解析类上的事务注解
        txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
        if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
            return txAttr;
        }
        // === 此时 specificMethod ≠ method(说明原始 method 是接口定义) ===
        // 再尝试解析接口方法本身及其声明类(即接口)上的事务注解
        if (specificMethod != method) {
            txAttr = findTransactionAttribute(method);
            if (txAttr != null) {
                return txAttr;
            }
            txAttr = findTransactionAttribute(method.getDeclaringClass());
            if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
                return txAttr;
            }
        }

        return null;
    }
}

TransactionInterceptor

​ TransactionInterceptor 是用于处理声明式事务(@Transactional 注解)的 AOP 拦截器,本质上是一个环绕(Around)通知,拦截被 @Transactional 标注的方法。

​ 它自身并不直接实现事务的提交、回滚等核心逻辑,而是将这些操作委托给底层的PlatformTransactionManager 来完成。

主要职责

  • 从 TransactionAttributeSource 中获取事务属性(如传播行为、隔离级别、是否只读等)
  • 调用 TransactionManager 获取或创建事务
  • 执行目标方法,捕获异常判断是否回滚
  • 正常返回时提交事务,异常时进行回滚
  • 管理当前事务上下文(通过 ThreadLocal)以支持事务传播

关键源码

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {

    // =============== 父类TransactionAspectSupport中的字段 =================
    private static final ThreadLocal<TransactionInfo> transactionInfoHolder = new NamedThreadLocal<>(
            "Current aspect-driven transaction");

    /**
     * AOP拦截器核心逻辑:拦截事务方法,并根据事务配置(声明式 or 编程式)处理事务生命周期
     */
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {

        TransactionAttributeSource tas = getTransactionAttributeSource();
        // 获取事务属性
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        // 获取beanFactory中的TransactionManager(可能是DataSourceTransactionManager、JtaTransactionManager等)
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        // 构造方法唯一标识(例如:site.shanzhao.UserService.save)
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        // 声明式事务处理(@Transactional注解就走这)
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // 创建事务信息对象,必要时启动事务
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

            Object retVal;
            try {
                // 执行被增强的方法
                retVal = invocation.proceedWithInvocation();
            } catch (Throwable ex) {
                // 处理异常回滚
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            } finally {
                // 清理ThreadLocal中的事务信息
                cleanupTransactionInfo(txInfo);
            }
            // 方法正常返回则提交事务
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        else { // 编程式事务处理
            Object result;
            final ThrowableHolder throwableHolder = new ThrowableHolder();

            try {
                result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
                    TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                    try {
                        return invocation.proceedWithInvocation();
                    } catch (Throwable ex) {
                        if (txAttr.rollbackOn(ex)) {
                            if (ex instanceof RuntimeException) {
                                throw (RuntimeException) ex;
                            } else {
                                throw new ThrowableHolderException(ex);
                            }
                        } else {
                            throwableHolder.throwable = ex;
                            return null;
                        }
                    } finally {
                        cleanupTransactionInfo(txInfo);
                    }
                });
            } catch (ThrowableHolderException ex) {
                throw ex.getCause();
            } catch (TransactionSystemException ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                    ex2.initApplicationException(throwableHolder.throwable);
                }
                throw ex2;
            } catch (Throwable ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                }
                throw ex2;
            }

            // Check result state: It might indicate a Throwable to rethrow.
            if (throwableHolder.throwable != null) {
                throw throwableHolder.throwable;
            }
            return result;
        }
    }

    /**
     * 根据事务属性创建事务(如果有必要)。
     *
     * - 若当前方法配置了事务属性(如 @Transactional),则获取事务管理器并尝试开启事务。
     * - 若未指定事务名称,则使用方法签名作为事务名称(便于日志跟踪和调试)。
     * - 最终返回一个封装了事务状态的 TransactionInfo 对象(用于后续提交或回滚处理)。
     *
     */
    protected TransactionInfo createTransactionIfNecessary(
            @Nullable PlatformTransactionManager tm,
            @Nullable TransactionAttribute txAttr,
            final String joinpointIdentification) {

        // 如果没有设置事务名称,则使用方法签名作为名称,并封装成 DelegatingTransactionAttribute。
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }

        TransactionStatus status = null;

        // 若存在事务属性,表示方法需要事务处理
        if (txAttr != null) {
            if (tm != null) {
                // 调用事务管理器开启事务,返回事务状态(可能是新事务,也可能是参与已有事务)
                status = tm.getTransaction(txAttr);
            } else {
                // 没有配置事务管理器,无法执行事务控制,打印调试日志
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }

        // 构造 TransactionInfo 对象,并将其绑定到父类TransactionAspectSupport#ThreadLocal 中,
        // 用于在线程内部保存当前事务的上下文信息(如 TransactionStatus、事务属性等),
        // 以支持事务传播、回滚控制和资源清理等操作
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }

    /**
     * 在目标方法抛出异常后完成事务处理(回滚或提交)
     *
     * 该方法根据事务属性(如 @Transactional 中的 rollbackFor)判断是否需要回滚,
     * 若不需要回滚则尝试提交(注意:提交时内部仍可能因为标记了 rollbackOnly 而实际执行回滚)。
     *
     */
    protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                        "] after exception: " + ex);
            }
            if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                // 命中回滚异常,需要操作事务回滚
                try {
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                } catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                } catch (RuntimeException | Error ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    throw ex2;
                }
            } else {
                // 出现了异常但不需要回滚。则尝试commit(不一定真的会commit,内部还是可能根据rollbackOnly来判断是否rollback)
                try {
                    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                } catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                } catch (RuntimeException | Error ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    throw ex2;
                }
            }
        }
    }
}

PlatformTransactionManager

​ PlatformTransactionManager 抽象了事务的获取、提交、回滚,是 Spring 事务架构的核心入口。所有 @Transactional 的底层实现最终都依赖它完成真正的事务操作。基本实现类为DataSourceTransactionManager

TransactionStatus

​ 默认实现为DefaultTransactionStatus。每个@Transactional注解都会生成一个DefaultTransactionStatus,用于表示当前事物的允许状态。在事务执行流程中,它作为事务的运行时上下文贯穿始终,PlatformTransactionManager 对其进行更新和查询,以决定事务的提交、回滚及其他控制逻辑。

其中的isNewTransaction()方法决定了当前@Transactional生成的事物是否有资格进行commit或rollback

public class DefaultTransactionStatus extends AbstractTransactionStatus {

    /**
     * 当前事务对象,一般为具体事务实现(如 DataSourceTransactionObject)。
     *
     * - 若当前方法运行在事务上下文中,则 非null;
     * - 若未开启事务(如事务传播行为为 NOT_SUPPORTED),则为 null;
     *
     * 注意:即使在同一个物理事务中,每个 @Transactional 方法对应的 DefaultTransactionStatus 实例不同,
     * 但它们内部的 transaction(如 ConnectionHolder)可能是同一个对象,表示共享同一底层连接。
     */
    @Nullable
    private final Object transaction;

    /**
     * 是否是“新事务”的创建者。
     *
     * - true:当前方法通过传播行为触发了事务创建;
     * - false:当前方法加入了已有事务;
     *
     * 注意:该值为 true 不等于事务实际存在,需结合 transaction 字段判断是否真有事务资源。
     */
    private final boolean newTransaction;

    /**
     * 是否注册了 TransactionSynchronizationManager,同步管理器用于事务钩子(如 afterCommit)。
     * 即是否是本方法负责事务同步的初始化(如绑定资源、触发同步回调)。
     */
    private final boolean newSynchronization;

    /**
     * 当前事务是否只读,由 @Transactional 配置
     */
    private final boolean readOnly;

    /**
     * 当前日志是否是debug等级以上
     */
    private final boolean debug;

    /**
     * 很重要
     * 如果当前事务为嵌套事务或需要挂起上一个事务,则用于保存被挂起的事务资源。
     *
     * 一般为 SuspendedResourcesHolder 类型,用于在当前事务完成后恢复之前的事务状态。
     */
    @Nullable
    private final Object suspendedResources;

    // ============ 父类AbstractTransactionStatus中的字段 ============

    /**
     * 标识当前事务是否被标记为回滚(通常通过 setRollbackOnly() 触发)。
     */
    private boolean rollbackOnly = false;

    /**
     * 当前事务是否已完成(无论提交还是回滚)。
     */
    private boolean completed = false;

    /**
     * 用于保存事务的保存点(Savepoint),支持嵌套事务回滚。
     */
    @Nullable
    private Object savepoint;


    /**
     * 很重要的方法,决定了当前@Transactional生成的事物是否有资格进行commit或rollback
     */
    public boolean isNewTransaction() {
        return (hasTransaction() && this.newTransaction);
    }
}

DataSourceTransactionManager

Spring 默认提供的基于 JDBC 数据源(javax.sql.DataSource)的 PlatformTransactionManager 实现

核心方法getTransaction 流程总结

  • 首先判断当前线程是否已绑定事务(即 transactionActive 状态);若存在事务,则代表当前可能处于嵌套或参与中 事务的处理场景
  • 根据 TransactionDefinition 中指定的事务传播行为(PROPAGATION_*)来决定事务处理策略:
    • 是否加入当前事务
    • 是否挂起已有事务
    • 是否开启新事务
  • 最终构建 DefaultTransactionStatus 实例,其中封装了当前事务的控制信息(如是否新建、是否可回滚、是否只读等)
  • 如果事务被创建(非参与),还会:
    • 设置数据库连接的自动提交为 false
    • 绑定资源到当前线程(ThreadLocal)
    • 注册事务同步管理器(TransactionSynchronizationManager)以支持hook函数的调用(如 beforeCommit、afterCompletion 等)

关键源码

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {

    /**
     * 事物的获取(父类中AbstractPlatformTransactionManager的方法 )
     */
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {
        // 获取当前事务上下文对象,默认为 DataSourceTransactionObject
        // 初次进入时内部的 ConnectionHolder 为空(即还未绑定连接)
        Object transaction = doGetTransaction();

        // 缓存日志等级判断,避免多次调用 logger.isDebugEnabled()
        boolean debugEnabled = logger.isDebugEnabled();

        // 若无自定义事务定义,使用默认配置(PROPAGATION_REQUIRED,ISOLATION_DEFAULT 等)
        if (definition == null) {
            definition = new DefaultTransactionDefinition();
        }

        // ========== 检查当前线程是否已存在事务 ========== //
        // 条件:当前线程绑定了 ConnectionHolder 且其 transactionActive = true
        if (isExistingTransaction(transaction)) {
            // 已存在事务,根据传播行为判断处理方式
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // ========== 走到这表示还没有事物,根据传播行为判断是否需要新建事务 ========== //

        // 超时时间校验:不能小于默认值(-1)
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // ========== 根据事务传播行为做决策 ========== //

        // 传播行为为 MANDATORY,但当前又没有事务,抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }

        // 传播行为为 REQUIRED、REQUIRES_NEW、NESTED 都需要新建事务
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

            // 先挂起当前事务(无事务时挂空)
            SuspendedResourcesHolder suspendedResources = suspend(null);

            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }

            try {
                // 判断是否启用事务同步,默认是 true(除非设置为 NEVER)
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

                // 构建 DefaultTransactionStatus,封装事务上下文信息
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

                // 初始化事务:如创建连接、设置隔离级别、timeout,并将资源绑定到当前线程
                doBegin(transaction, definition);

                // 注册事务同步器(如触发 beforeCommit、afterCompletion 等回调)
                prepareSynchronization(status, definition);

                return status;
            } catch (RuntimeException | Error ex) {
                // 异常时恢复挂起的事务资源
                resume(null, suspendedResources);
                throw ex;
            }
        }

        // 传播行为为 SUPPORTS、NOT_SUPPORTED、NEVER —— 不启动实际事务,只做同步处理(如果配置了)
        else {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }

            // 是否启用事务同步,SYNCHRONIZATION_ALWAYS 时启用
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);

            // 创建一个“空事务”状态,主要用于注册同步器,但没有实际事务操作
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

    private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {

        // NEVER:完全不支持事务,当前线程存在事务则抛异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }

        // NOT_SUPPORTED:暂停(挂起)当前事务,然后以非事务的方式运行
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }

        // REQUIRES_NEW:暂停当前事务,新开一个事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            // 暂停当前事务,方便之后还原
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            } catch (RuntimeException | Error beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
        }

        // NESTED:先判断是否允许嵌套事务(默认允许),然后优先通过数据库保存点(Savepoint)实现
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                                "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) { // 默认走这,使用数据库Savepoint模拟嵌套事务
                DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false,
                        debugEnabled, null);
                status.createAndHoldSavepoint();
                return status;
            } else { // 少数情况使用真正的嵌套事务,如JTA中可能嵌套 begin/commit 调用
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }

        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        // 传播策略为SUPPORTS 或 REQUIRED 或 MANDATORY
        // 这三个对已存在的事务处理方式是一致的,啥也不做,也就是直接加入已存在的事务
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
        if (isValidateExistingTransaction()) {
            // 校验当前传播策略的隔离等级和是否只读与已存在的事务是否一致,不过默认不校验。
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: "
                            +
                            (currentIsolationLevel != null
                                    ? isoConstants.toCode(currentIsolationLevel,
                                            DefaultTransactionDefinition.PREFIX_ISOLATION)
                                    : "(unknown)"));
                }
            }
            // 校验只读属性
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        // 创建事务状态对象,参与已存在事务
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

    /**
     * 开启事物
     */
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            // 是否需要新建 Connection:满足任意一个条件即可
            // 1. 当前没有 ConnectionHolder(首次创建事务)
            // 2. 当前 ConnectionHolder 已参与过其他事务(如嵌套事务 REQUIRES_NEW 中旧连接已同步过)。此时不能复用旧连接,必须重新获取
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = obtainDataSource().getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            // 标记当前连接已参与事务同步
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();

            // 根据事务定义设置隔离级别,并记录旧值,事务完成后会恢复
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);

            // 当前Connection设为手动提交。此时把事物提交的控制权交给了Spring
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                con.setAutoCommit(false);
            }

            // readOnly 设置
            prepareTransactionalConnection(con, definition);
            // 标记事务为“激活”状态(事务已开启)
            txObject.getConnectionHolder().setTransactionActive(true);

            // 设置事务超时时间(单位:秒)
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }

            // 将当前 ConnectionHolder 绑定到线程上下文中(核心 ThreadLocal 操作)
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
            }
        }

        catch (Throwable ex) {
            // 如果创建连接过程中出现异常,释放资源并清理 ConnectionHolder
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, obtainDataSource());
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

    /**
     * 事物的commit判断,根据rollbackOnly字段也可能走rollback。(父类中AbstractPlatformTransactionManager的方法)
     */
    public final void commit(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) { // 事务已完成(提交或回滚),不允许再次提交或回滚,属于非法状态
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        // 标记为rollbackOnly的走强制回滚
        if (defStatus.isLocalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Transactional code has requested rollback");
            }
            processRollback(defStatus, false);
            return;
        }

        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
            // 当前事务被标记为全局回滚,说明内部事务标记为回滚,但外层需要commit。
            // 所以会先回滚,再抛UnexpectedRollbackException异常

            if (defStatus.isDebug()) {
                logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
            }
            // 出现了意外(外层commit,但内层rollback)unexpected为true
            processRollback(defStatus, true);
            return;
        }

        // 正常事务提交流程
        // 1. 调用 beforeCommit 和 beforeCompletion 钩子
        // 2. 如果存在保存点(Savepoint)则处理嵌套事务
        // 3. 真正调用底层数据库事务管理器进行提交
        // 4. 调用 afterCommit 和 afterCompletion 钩子
        // 5. 捕获并处理提交中的异常
        processCommit(defStatus);
    }
}

PROPAGATION总结

  1. 肯定派,必须存在事务

    传播行为 说明
    PROPAGATION_REQUIRED (默认) 不存在就新建,存在就加入
    PROPAGATION_REQUIRES_NEW 始终新建一个事务,且与旧事物完全隔离。也就至少有两此commit或rollback。存在原事务时会将原事务封装为SuspendedResourcesHolder,再重新获取一个新的数据库连接开启事务,等这个新事物运行完,再把SuspendedResourcesHolder复原
    PROPAGATION_NESTED 不存在就新建,存在就新开一个嵌套的事务。(先检查nestedTransactionAllowed,为false就抛异常了,代表不支持嵌套事务。不过默认为true。)和PROPAGATION_REQUIRES_NEW的区别在于这是用数据库的Savepoint实现,至始至终只会存在一个事务,如果当前回滚,也只会退回到Savepoint,不会对外层的事务造成影响,如果都能提交,最终也只有一次真正的事务commit。所以,注册的TransactionSynchronization hook函数也只会等待整个事务的结束来回调。如果不支持安全点(JTA),那实现就完全等于PROPAGATION_REQUIRES_NEW
    PROPAGATION_MANDATORY 支持已存在的事务,不存在则直接抛异常
  2. 中立派,不一定有事务

    传播行为 说明
    PROPAGATION_SUPPORTS 源码中并没有处理这种的传播策略。所以,它的作用是:存在事务就加入,不存在就以非事务的方式运行
  3. 否定派,不支持事务

    传播行为 说明
    PROPAGATION_NOT_SUPPORTED 以非事务的方式运行(有事务就暂停事务,重新获取一个没有事物的Connection使用。没有就啥也不做),实现和上面的差不多。但默认仍会注册事物同步器,让当前这个事物始终用同一个Connection
    PROPAGATION_NEVER 强制性的不支持事务,要是当前存在事务,就直接抛异常

TransactionSynchronizationManager

​ 是Spring事物体系中的一个辅助工具,维护线程级事务上下文信息,保证事务行为在多层调用、事务传播、资源管理等场景中保持一致。

Spring 的事务是通过 AOP 管理的跨方法事务逻辑。Spring 需要有个全局但又线程安全的上下文去维护事务信息。如果没有它,则:

  • 不同方法无法共享事务资源(比如同一个 JDBC Connection)

  • 无法实现事务传播(比如事务嵌套时决定是否复用原事务)

  • 无法注册和触发事务的生命周期回调(如 beforeCommitafterCompletion

关键字段

public abstract class TransactionSynchronizationManager {

    // 绑定到当前线程的资源,比如数据库连接(key 为 DataSource ,value 为ConnectionHolder)
    // Spring 在事务开始时绑定,在事务完成后解除绑定。
    private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");

    // 当前线程注册的事务同步器集合(TransactionSynchronization),
    // 用于在事务生命周期各个阶段(如提交前、提交后、回滚后)执行hook回调。
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>(
            "Transaction synchronizations");

    // 当前事务的名称(来自 TransactionDefinition#getName),通常用于日志记录和调试。
    private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>(
            "Current transaction name");

    // 当前事务是否被声明为只读,影响底层连接是否设置为只读模式(如 Connection#setReadOnly(true))。
    private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>(
            "Current transaction read-only status");

    // 当前事务的隔离级别(来自 TransactionDefinition)
    // 决定数据库访问的一致性级别,例如 READ_COMMITTED、REPEATABLE_READ 等。
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>(
            "Current transaction isolation level");

    // 当前事物是否被激活(即运行在有事物的情况下),不一定有commit或rollback的资格
    private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>(
            "Actual transaction active");

}

DataSourceUtils

​ 前面我们分析了 Spring 事务的整体流程,但还有1个关键问题:在实际项目中,Spring 很少直接作为持久化框架,MyBatis、Hibernate 等才是执行数据库操作的主力。那么事务是 Spring 管的,数据库连接却是由 MyBatis 获取的,两者如何协同?

这就引出了 DataSourceUtils 的作用:为持久层框架提供“Spring 管控下的连接”,确保它们获取到的 Connection 是当前事务中绑定的那个,这样事务控制(包括 commit/rollback)才能正确生效。

核心源码

public abstract class DataSourceUtils {
    

    public static Connection doGetConnection(DataSource dataSource) throws SQLException {
        Assert.notNull(dataSource, "No DataSource specified");

        // 1. 尝试从 TransactionSynchronizationManager 中获取线程绑定的 ConnectionHolder
        // 只要TransactionSynchronizationManager.isSynchronizationActive(),都应该获取的是同一个ConnectionHolder
        ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);

        // 2. 如果当前线程中存在已关联的 ConnectionHolder 且已经持有连接,或者它被事务标记为已同步
        if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
            // 引用计数
            conHolder.requested();

            // 如果还没有设置具体的连接,则新建连接并设置到 holder 中(resume 的场景)
            if (!conHolder.hasConnection()) {
                logger.debug("Fetching resumed JDBC Connection from DataSource");
                conHolder.setConnection(fetchConnection(dataSource));
            }

            return conHolder.getConnection();
        }
        // =============== 否则:没有事务,或没有绑定连接的事务 ==================

        logger.debug("Fetching JDBC Connection from DataSource");

        // 3. 获取一个新的物理连接(不在事务中或事务尚未绑定连接)
        Connection con = fetchConnection(dataSource);

        // 如果当前事物的事物同步器被激活,则将上面获取的Connection也绑定到事物资源里。让当前事物始终用用一个Connection
        // 就算当前事物是NOT_SUPPORTED,默认也会缓存ConnectionHolder
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            try {
                ConnectionHolder holderToUse = conHolder;

                if (holderToUse == null) {
                    holderToUse = new ConnectionHolder(con);
                } else {
                    holderToUse.setConnection(con);
                }

                holderToUse.requested();

                // 注册一个事物同步器用于hook
                TransactionSynchronizationManager.registerSynchronization(
                        new ConnectionSynchronization(holderToUse, dataSource));

                // 标记该连接已参与事务同步
                holderToUse.setSynchronizedWithTransaction(true);

                // 第一次调用时才绑定:DataSource -> ConnectionHolder
                if (holderToUse != conHolder) {
                    TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
                }

            } catch (RuntimeException ex) {
                releaseConnection(con, dataSource);
                throw ex;
            }
        }

        return con;
    }
}