Spring-Transactional
从源码分析了@Transactional的解析和切面使用流程。Spring事物如此复杂是因为不同的PROPAGATION有不同的策略,特别是在嵌套的流程中,所以重点分析了PlatformTransactionManager的事物获取流程,深入了解了PROPAGATION的实现
@EnableTransactionManagement
开启Spring基于注解的事务管理,其背后主要通过导入一个基础配置类 ProxyTransactionManagementConfiguration
来实现核心功能。
该配置类定义并注册了事务功能所依赖的三个关键 Bean:
Bean 名称 | 作用概述 |
---|---|
TransactionAttributeSource | 用于解析方法或类上的 @Transactional 注解,提取事务属性(如传播行为、回滚规则等)。默认实现为 AnnotationTransactionAttributeSource 。 |
TransactionInterceptor | 真正的事务拦截器,在目标方法执行前后进行事务管理操作(开启、提交、回滚等)。本质上是一个 MethodInterceptor 。 |
BeanFactoryTransactionAttributeSourceAdvisor | 是 Spring AOP 中的 Advisor ,封装了 TransactionAttributeSource 和 TransactionInterceptor ,通过切点判断哪些方法需要事务,并将拦截器应用到这些方法上。 |
BeanFactoryTransactionAttributeSourceAdvisor
是一个PointcutAdvisor,既可以解析方法和类的注解事物信息,也能将真正的拦截器应用到代理中,实现注解事物的支持。
TransactionAttributeSourcePointcut
TransactionAttributeSourcePointcut
是 Spring 声明式事务机制中的关键组件之一,它继承自 StaticMethodMatcherPointcut
,这是一个仅关注方法匹配、对类无过滤限制的 Pointcut 实现。
为什么这么设计?因为在事务注解的处理逻辑中
- 是否应用事务主要取决于方法级别是否存在
@Transactional
注解 - 若方法上无注解,才会回退查找类级别上的事务配置
- 因此,从设计上讲,应该保留对所有类的匹配资格,不能在
ClassFilter
层面就提前“判死刑”
换句话说,Spring 不会直接排除某些类,而是逐个方法检查是否具备事务属性,从而实现最大化的灵活性和兼容性。
public abstract class StaticMethodMatcherPointcut extends StaticMethodMatcher implements Pointcut {
// 始终匹配为true的ClassFilter
private ClassFilter classFilter = ClassFilter.TRUE;
public void setClassFilter(ClassFilter classFilter) {
this.classFilter = classFilter;
}
@Override
public ClassFilter getClassFilter() {
return this.classFilter;
}
@Override
public final MethodMatcher getMethodMatcher() {
return this;
}
}
方法解析
public boolean matches(Method method, Class<?> targetClass) {
// === 排除掉一些不需要事务增强的内部类 ===
// 例如事务代理自身、事务管理器、异常翻译器等,避免不必要的切面增强
if (TransactionalProxy.class.isAssignableFrom(targetClass) ||
PlatformTransactionManager.class.isAssignableFrom(targetClass) ||
PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) {
return false;
}
// === 通过 TransactionAttributeSource 判断该方法是否需要事务处理 ===
// 如果解析不到事务属性(即无 @Transactional 注解),则不匹配
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}
TransactionAttributeSource
TransactionAttributeSource
是 Spring 中用于解析方法/类上的事务注解的核心接口,Spring 的默认实现为 **AnnotationTransactionAttributeSource
**。
它的作用是从方法或类中提取事务元信息,并将其封装为一个 TransactionAttribute
对象,供后续事务切面使用。
支持的注解类型包括:
- Spring 自身的
@org.springframework.transaction.annotation.Transactional
- Java 标准的
@javax.transaction.Transactional
- EJB 标准的
@javax.ejb.TransactionAttribute
Spring 的 @Transactional
注解最终会被解析为一个 RuleBasedTransactionAttribute
对象,该对象包含了传播行为、隔离级别、超时、只读等完整事务配置。
解析优先级
在执行方法事务增强前,Spring 会根据如下顺序查找事务注解,一旦某一层找到了,就立即返回,停止继续向上查找
- 目标类中“具体实现方法”上的注解
- 目标类上(Class级别)的注解
- 接口中定义的“方法”上的注解
- 接口自身(Class级别)的注解
关键源码
public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSource
implements Serializable {
// javax.transaction.Transactional注解支持
private static final boolean jta12Present;
// javax.ejb.TransactionAttribute注解支持
private static final boolean ejb3Present;
static {
ClassLoader classLoader = AnnotationTransactionAttributeSource.class.getClassLoader();
jta12Present = ClassUtils.isPresent("javax.transaction.Transactional", classLoader);
ejb3Present = ClassUtils.isPresent("javax.ejb.TransactionAttribute", classLoader);
}
// 是否只代理public方法,默认为true
private final boolean publicMethodsOnly;
// 事物解析器,顺序有优先级
// 1. SpringTransactionAnnotationParser (这个是一定会有的)
// 2. JtaTransactionAnnotationParser(需要有javax.transaction.Transactional注解)
// 3. Ejb3TransactionAnnotationParser(需要有javax.ejb.TransactionAttribute注解)
private final Set<TransactionAnnotationParser> annotationParsers;
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
if (method.getDeclaringClass() == Object.class) {
return null;
}
// 先看看有没有缓存
Object cacheKey = getCacheKey(method, targetClass);
TransactionAttribute cached = this.attributeCache.get(cacheKey);
if (cached != null) {
// NULL_TRANSACTION_ATTRIBUTE表示无事物
if (cached == NULL_TRANSACTION_ATTRIBUTE) {
return null;
} else {
return cached;
}
} else {
// 事物信息查找
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
// 缓存并返回
if (txAttr == null) {
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
} else {
String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
if (txAttr instanceof DefaultTransactionAttribute) {
((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
}
if (logger.isTraceEnabled()) {
logger.trace(
"Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
}
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// 默认非public方法不支持
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// 获取最具体的方法(桥接方法处理),确保是目标类中的实现方法
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// 先解析方法上的解析事务注解
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// 方法上没有,在解析类上的事务注解
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
// === 此时 specificMethod ≠ method(说明原始 method 是接口定义) ===
// 再尝试解析接口方法本身及其声明类(即接口)上的事务注解
if (specificMethod != method) {
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
}
TransactionInterceptor
TransactionInterceptor 是用于处理声明式事务(@Transactional 注解)的 AOP 拦截器,本质上是一个环绕(Around)通知,拦截被 @Transactional 标注的方法。
它自身并不直接实现事务的提交、回滚等核心逻辑,而是将这些操作委托给底层的PlatformTransactionManager 来完成。
主要职责
- 从 TransactionAttributeSource 中获取事务属性(如传播行为、隔离级别、是否只读等)
- 调用 TransactionManager 获取或创建事务
- 执行目标方法,捕获异常判断是否回滚
- 正常返回时提交事务,异常时进行回滚
- 管理当前事务上下文(通过 ThreadLocal)以支持事务传播
关键源码
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
// =============== 父类TransactionAspectSupport中的字段 =================
private static final ThreadLocal<TransactionInfo> transactionInfoHolder = new NamedThreadLocal<>(
"Current aspect-driven transaction");
/**
* AOP拦截器核心逻辑:拦截事务方法,并根据事务配置(声明式 or 编程式)处理事务生命周期
*/
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取事务属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取beanFactory中的TransactionManager(可能是DataSourceTransactionManager、JtaTransactionManager等)
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 构造方法唯一标识(例如:site.shanzhao.UserService.save)
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 声明式事务处理(@Transactional注解就走这)
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务信息对象,必要时启动事务
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行被增强的方法
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// 处理异常回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
// 清理ThreadLocal中的事务信息
cleanupTransactionInfo(txInfo);
}
// 方法正常返回则提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
else { // 编程式事务处理
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();
try {
result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
} catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new ThrowableHolderException(ex);
}
} else {
throwableHolder.throwable = ex;
return null;
}
} finally {
cleanupTransactionInfo(txInfo);
}
});
} catch (ThrowableHolderException ex) {
throw ex.getCause();
} catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
} catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}
/**
* 根据事务属性创建事务(如果有必要)。
*
* - 若当前方法配置了事务属性(如 @Transactional),则获取事务管理器并尝试开启事务。
* - 若未指定事务名称,则使用方法签名作为事务名称(便于日志跟踪和调试)。
* - 最终返回一个封装了事务状态的 TransactionInfo 对象(用于后续提交或回滚处理)。
*
*/
protected TransactionInfo createTransactionIfNecessary(
@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr,
final String joinpointIdentification) {
// 如果没有设置事务名称,则使用方法签名作为名称,并封装成 DelegatingTransactionAttribute。
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
// 若存在事务属性,表示方法需要事务处理
if (txAttr != null) {
if (tm != null) {
// 调用事务管理器开启事务,返回事务状态(可能是新事务,也可能是参与已有事务)
status = tm.getTransaction(txAttr);
} else {
// 没有配置事务管理器,无法执行事务控制,打印调试日志
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 构造 TransactionInfo 对象,并将其绑定到父类TransactionAspectSupport#ThreadLocal 中,
// 用于在线程内部保存当前事务的上下文信息(如 TransactionStatus、事务属性等),
// 以支持事务传播、回滚控制和资源清理等操作
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
/**
* 在目标方法抛出异常后完成事务处理(回滚或提交)
*
* 该方法根据事务属性(如 @Transactional 中的 rollbackFor)判断是否需要回滚,
* 若不需要回滚则尝试提交(注意:提交时内部仍可能因为标记了 rollbackOnly 而实际执行回滚)。
*
*/
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
// 命中回滚异常,需要操作事务回滚
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
// 出现了异常但不需要回滚。则尝试commit(不一定真的会commit,内部还是可能根据rollbackOnly来判断是否rollback)
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
}
PlatformTransactionManager
PlatformTransactionManager 抽象了事务的获取、提交、回滚,是 Spring 事务架构的核心入口。所有 @Transactional 的底层实现最终都依赖它完成真正的事务操作。基本实现类为DataSourceTransactionManager。
TransactionStatus
默认实现为DefaultTransactionStatus。每个@Transactional注解都会生成一个DefaultTransactionStatus,用于表示当前事物的允许状态。在事务执行流程中,它作为事务的运行时上下文贯穿始终,PlatformTransactionManager 对其进行更新和查询,以决定事务的提交、回滚及其他控制逻辑。
其中的isNewTransaction()方法决定了当前@Transactional生成的事物是否有资格进行commit或rollback
public class DefaultTransactionStatus extends AbstractTransactionStatus {
/**
* 当前事务对象,一般为具体事务实现(如 DataSourceTransactionObject)。
*
* - 若当前方法运行在事务上下文中,则 非null;
* - 若未开启事务(如事务传播行为为 NOT_SUPPORTED),则为 null;
*
* 注意:即使在同一个物理事务中,每个 @Transactional 方法对应的 DefaultTransactionStatus 实例不同,
* 但它们内部的 transaction(如 ConnectionHolder)可能是同一个对象,表示共享同一底层连接。
*/
@Nullable
private final Object transaction;
/**
* 是否是“新事务”的创建者。
*
* - true:当前方法通过传播行为触发了事务创建;
* - false:当前方法加入了已有事务;
*
* 注意:该值为 true 不等于事务实际存在,需结合 transaction 字段判断是否真有事务资源。
*/
private final boolean newTransaction;
/**
* 是否注册了 TransactionSynchronizationManager,同步管理器用于事务钩子(如 afterCommit)。
* 即是否是本方法负责事务同步的初始化(如绑定资源、触发同步回调)。
*/
private final boolean newSynchronization;
/**
* 当前事务是否只读,由 @Transactional 配置
*/
private final boolean readOnly;
/**
* 当前日志是否是debug等级以上
*/
private final boolean debug;
/**
* 很重要
* 如果当前事务为嵌套事务或需要挂起上一个事务,则用于保存被挂起的事务资源。
*
* 一般为 SuspendedResourcesHolder 类型,用于在当前事务完成后恢复之前的事务状态。
*/
@Nullable
private final Object suspendedResources;
// ============ 父类AbstractTransactionStatus中的字段 ============
/**
* 标识当前事务是否被标记为回滚(通常通过 setRollbackOnly() 触发)。
*/
private boolean rollbackOnly = false;
/**
* 当前事务是否已完成(无论提交还是回滚)。
*/
private boolean completed = false;
/**
* 用于保存事务的保存点(Savepoint),支持嵌套事务回滚。
*/
@Nullable
private Object savepoint;
/**
* 很重要的方法,决定了当前@Transactional生成的事物是否有资格进行commit或rollback
*/
public boolean isNewTransaction() {
return (hasTransaction() && this.newTransaction);
}
}
DataSourceTransactionManager
Spring 默认提供的基于 JDBC 数据源(javax.sql.DataSource
)的 PlatformTransactionManager
实现
核心方法getTransaction
流程总结
- 首先判断当前线程是否已绑定事务(即
transactionActive
状态);若存在事务,则代表当前可能处于嵌套或参与中 事务的处理场景 - 根据
TransactionDefinition
中指定的事务传播行为(PROPAGATION_*
)来决定事务处理策略:- 是否加入当前事务
- 是否挂起已有事务
- 是否开启新事务
- 最终构建
DefaultTransactionStatus
实例,其中封装了当前事务的控制信息(如是否新建、是否可回滚、是否只读等) - 如果事务被创建(非参与),还会:
- 设置数据库连接的自动提交为 false
- 绑定资源到当前线程(ThreadLocal)
- 注册事务同步管理器(
TransactionSynchronizationManager
)以支持hook函数的调用(如 beforeCommit、afterCompletion 等)
关键源码
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
/**
* 事物的获取(父类中AbstractPlatformTransactionManager的方法 )
*/
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// 获取当前事务上下文对象,默认为 DataSourceTransactionObject
// 初次进入时内部的 ConnectionHolder 为空(即还未绑定连接)
Object transaction = doGetTransaction();
// 缓存日志等级判断,避免多次调用 logger.isDebugEnabled()
boolean debugEnabled = logger.isDebugEnabled();
// 若无自定义事务定义,使用默认配置(PROPAGATION_REQUIRED,ISOLATION_DEFAULT 等)
if (definition == null) {
definition = new DefaultTransactionDefinition();
}
// ========== 检查当前线程是否已存在事务 ========== //
// 条件:当前线程绑定了 ConnectionHolder 且其 transactionActive = true
if (isExistingTransaction(transaction)) {
// 已存在事务,根据传播行为判断处理方式
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// ========== 走到这表示还没有事物,根据传播行为判断是否需要新建事务 ========== //
// 超时时间校验:不能小于默认值(-1)
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// ========== 根据事务传播行为做决策 ========== //
// 传播行为为 MANDATORY,但当前又没有事务,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 传播行为为 REQUIRED、REQUIRES_NEW、NESTED 都需要新建事务
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 先挂起当前事务(无事务时挂空)
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
// 判断是否启用事务同步,默认是 true(除非设置为 NEVER)
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 构建 DefaultTransactionStatus,封装事务上下文信息
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 初始化事务:如创建连接、设置隔离级别、timeout,并将资源绑定到当前线程
doBegin(transaction, definition);
// 注册事务同步器(如触发 beforeCommit、afterCompletion 等回调)
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error ex) {
// 异常时恢复挂起的事务资源
resume(null, suspendedResources);
throw ex;
}
}
// 传播行为为 SUPPORTS、NOT_SUPPORTED、NEVER —— 不启动实际事务,只做同步处理(如果配置了)
else {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
// 是否启用事务同步,SYNCHRONIZATION_ALWAYS 时启用
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 创建一个“空事务”状态,主要用于注册同步器,但没有实际事务操作
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// NEVER:完全不支持事务,当前线程存在事务则抛异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// NOT_SUPPORTED:暂停(挂起)当前事务,然后以非事务的方式运行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// REQUIRES_NEW:暂停当前事务,新开一个事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 暂停当前事务,方便之后还原
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// NESTED:先判断是否允许嵌套事务(默认允许),然后优先通过数据库保存点(Savepoint)实现
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) { // 默认走这,使用数据库Savepoint模拟嵌套事务
DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false,
debugEnabled, null);
status.createAndHoldSavepoint();
return status;
} else { // 少数情况使用真正的嵌套事务,如JTA中可能嵌套 begin/commit 调用
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
// 传播策略为SUPPORTS 或 REQUIRED 或 MANDATORY
// 这三个对已存在的事务处理方式是一致的,啥也不做,也就是直接加入已存在的事务
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
// 校验当前传播策略的隔离等级和是否只读与已存在的事务是否一致,不过默认不校验。
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: "
+
(currentIsolationLevel != null
? isoConstants.toCode(currentIsolationLevel,
DefaultTransactionDefinition.PREFIX_ISOLATION)
: "(unknown)"));
}
}
// 校验只读属性
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
// 创建事务状态对象,参与已存在事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
/**
* 开启事物
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 是否需要新建 Connection:满足任意一个条件即可
// 1. 当前没有 ConnectionHolder(首次创建事务)
// 2. 当前 ConnectionHolder 已参与过其他事务(如嵌套事务 REQUIRES_NEW 中旧连接已同步过)。此时不能复用旧连接,必须重新获取
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 标记当前连接已参与事务同步
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 根据事务定义设置隔离级别,并记录旧值,事务完成后会恢复
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 当前Connection设为手动提交。此时把事物提交的控制权交给了Spring
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// readOnly 设置
prepareTransactionalConnection(con, definition);
// 标记事务为“激活”状态(事务已开启)
txObject.getConnectionHolder().setTransactionActive(true);
// 设置事务超时时间(单位:秒)
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 将当前 ConnectionHolder 绑定到线程上下文中(核心 ThreadLocal 操作)
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
// 如果创建连接过程中出现异常,释放资源并清理 ConnectionHolder
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
/**
* 事物的commit判断,根据rollbackOnly字段也可能走rollback。(父类中AbstractPlatformTransactionManager的方法)
*/
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) { // 事务已完成(提交或回滚),不允许再次提交或回滚,属于非法状态
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 标记为rollbackOnly的走强制回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
// 当前事务被标记为全局回滚,说明内部事务标记为回滚,但外层需要commit。
// 所以会先回滚,再抛UnexpectedRollbackException异常
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 出现了意外(外层commit,但内层rollback)unexpected为true
processRollback(defStatus, true);
return;
}
// 正常事务提交流程
// 1. 调用 beforeCommit 和 beforeCompletion 钩子
// 2. 如果存在保存点(Savepoint)则处理嵌套事务
// 3. 真正调用底层数据库事务管理器进行提交
// 4. 调用 afterCommit 和 afterCompletion 钩子
// 5. 捕获并处理提交中的异常
processCommit(defStatus);
}
}
PROPAGATION总结
肯定派,必须存在事务
传播行为 说明 PROPAGATION_REQUIRED
(默认)不存在就新建,存在就加入 PROPAGATION_REQUIRES_NEW
始终新建一个事务,且与旧事物完全隔离。也就至少有两此commit或rollback。存在原事务时会将原事务封装为SuspendedResourcesHolder,再重新获取一个新的数据库连接开启事务,等这个新事物运行完,再把SuspendedResourcesHolder复原 PROPAGATION_NESTED
不存在就新建,存在就新开一个嵌套的事务。(先检查nestedTransactionAllowed,为false就抛异常了,代表不支持嵌套事务。不过默认为true。)和PROPAGATION_REQUIRES_NEW的区别在于这是用数据库的Savepoint实现,至始至终只会存在一个事务,如果当前回滚,也只会退回到Savepoint,不会对外层的事务造成影响,如果都能提交,最终也只有一次真正的事务commit。所以,注册的TransactionSynchronization hook函数也只会等待整个事务的结束来回调。如果不支持安全点(JTA),那实现就完全等于PROPAGATION_REQUIRES_NEW PROPAGATION_MANDATORY
支持已存在的事务,不存在则直接抛异常 中立派,不一定有事务
传播行为 说明 PROPAGATION_SUPPORTS
源码中并没有处理这种的传播策略。所以,它的作用是:存在事务就加入,不存在就以非事务的方式运行 否定派,不支持事务
传播行为 说明 PROPAGATION_NOT_SUPPORTED
以非事务的方式运行(有事务就暂停事务,重新获取一个没有事物的Connection使用。没有就啥也不做),实现和上面的差不多。但默认仍会注册事物同步器,让当前这个事物始终用同一个Connection PROPAGATION_NEVER
强制性的不支持事务,要是当前存在事务,就直接抛异常
TransactionSynchronizationManager
是Spring事物体系中的一个辅助工具,维护线程级事务上下文信息,保证事务行为在多层调用、事务传播、资源管理等场景中保持一致。
Spring 的事务是通过 AOP 管理的跨方法事务逻辑。Spring 需要有个全局但又线程安全的上下文去维护事务信息。如果没有它,则:
不同方法无法共享事务资源(比如同一个 JDBC Connection)
无法实现事务传播(比如事务嵌套时决定是否复用原事务)
无法注册和触发事务的生命周期回调(如
beforeCommit
、afterCompletion
)
关键字段
public abstract class TransactionSynchronizationManager {
// 绑定到当前线程的资源,比如数据库连接(key 为 DataSource ,value 为ConnectionHolder)
// Spring 在事务开始时绑定,在事务完成后解除绑定。
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
// 当前线程注册的事务同步器集合(TransactionSynchronization),
// 用于在事务生命周期各个阶段(如提交前、提交后、回滚后)执行hook回调。
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>(
"Transaction synchronizations");
// 当前事务的名称(来自 TransactionDefinition#getName),通常用于日志记录和调试。
private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>(
"Current transaction name");
// 当前事务是否被声明为只读,影响底层连接是否设置为只读模式(如 Connection#setReadOnly(true))。
private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>(
"Current transaction read-only status");
// 当前事务的隔离级别(来自 TransactionDefinition)
// 决定数据库访问的一致性级别,例如 READ_COMMITTED、REPEATABLE_READ 等。
private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>(
"Current transaction isolation level");
// 当前事物是否被激活(即运行在有事物的情况下),不一定有commit或rollback的资格
private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>(
"Actual transaction active");
}
DataSourceUtils
前面我们分析了 Spring 事务的整体流程,但还有1个关键问题:在实际项目中,Spring 很少直接作为持久化框架,MyBatis、Hibernate 等才是执行数据库操作的主力。那么事务是 Spring 管的,数据库连接却是由 MyBatis 获取的,两者如何协同?
这就引出了 DataSourceUtils
的作用:为持久层框架提供“Spring 管控下的连接”,确保它们获取到的 Connection
是当前事务中绑定的那个,这样事务控制(包括 commit/rollback)才能正确生效。
核心源码
public abstract class DataSourceUtils {
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
// 1. 尝试从 TransactionSynchronizationManager 中获取线程绑定的 ConnectionHolder
// 只要TransactionSynchronizationManager.isSynchronizationActive(),都应该获取的是同一个ConnectionHolder
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
// 2. 如果当前线程中存在已关联的 ConnectionHolder 且已经持有连接,或者它被事务标记为已同步
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
// 引用计数
conHolder.requested();
// 如果还没有设置具体的连接,则新建连接并设置到 holder 中(resume 的场景)
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
// =============== 否则:没有事务,或没有绑定连接的事务 ==================
logger.debug("Fetching JDBC Connection from DataSource");
// 3. 获取一个新的物理连接(不在事务中或事务尚未绑定连接)
Connection con = fetchConnection(dataSource);
// 如果当前事物的事物同步器被激活,则将上面获取的Connection也绑定到事物资源里。让当前事物始终用用一个Connection
// 就算当前事物是NOT_SUPPORTED,默认也会缓存ConnectionHolder
if (TransactionSynchronizationManager.isSynchronizationActive()) {
try {
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
} else {
holderToUse.setConnection(con);
}
holderToUse.requested();
// 注册一个事物同步器用于hook
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
// 标记该连接已参与事务同步
holderToUse.setSynchronizedWithTransaction(true);
// 第一次调用时才绑定:DataSource -> ConnectionHolder
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
} catch (RuntimeException ex) {
releaseConnection(con, dataSource);
throw ex;
}
}
return con;
}
}