JAVASpring

一篇文章彻底搞懂 Spring 的事务

转载:一篇文章彻底搞懂 Spring 的事务

引言


作为技术人,很忌讳对某个问题浅尝辄止,一知半解,特别是经常使用的技术。Spring 作为经久不衰的 Java 框架,自然少不了对事务的支持。那么,关于 Spring 的事务,你了解多少呢?如果只是知道在方法上加一个 @Transactional 注解就可以支持事务,或者说只是简单地知道 Spring 声明式事务的背后原理是 AOP,恐怕还不够。今天我们就一起深入了解下 Spring 的事务。

一、Srping 中如何开启事务


Spring 中开启事务有两种方式,一种是 声明式事务,另一种是编程式事务

 1. 声明式事务

@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void test() {
    // 业务逻辑
}

2. 编程式事务

使用 TransactionTemplate等类和 API 手动管理事务,控制事务的新建、提交、回滚等过程。

  • TransactionTemplate
@Resource
private TransactionTemplate transactionTemplate;

@Transactional
public void test() {
    transactionTemplate.executeWithoutResult(status -> {
        // 业务逻辑
        if (something not right) {
            // 回滚
            status.setRollbackOnly();
        }
    });
}

• TransactionManager

@Resource
private PlatformTransactionManager transactionManager;

@Transactional
publicvoidtest() {
    // 定义事务
    TransactionDefinitiontransactionDefinition=newDefaultTransactionDefinition();
    // 获取事务状态
    TransactionStatustransactionStatus= transactionManager.getTransaction(transactionDefinition);

    try {
        // 业务操作
        // 提交事务
        transactionManager.commit(transactionStatus);
    } catch (Exception e) {
        // 异常回滚事务
        transactionManager.rollback(transactionStatus);
        throw e;
    }
}

二、为什么不建议用声明式事务?


上面的两种事务实现方式,明显声明式事务 更简单更简洁,对业务也没有侵入性。但为什么说不建议用声明式事务呢?

当然这个也不是笔者建议的,而是阿里巴巴《Java开发手册 v1.5.0 华山版》 建议的。

为什么它要这么建议呢? 答案肯定是声明式事务 存在着让人难以忽略的缺点。

事务粒度

首先最容易想到的应该是事务的粒度问题。

因为声明式事务能控制的最小粒度方法,整个方法都包含在事务内,但有时这并不是我们想要的,我们希望事务粒度能更小一点,比如说只有某几行数据库操作才需要事务。

比如,如果我们将 RPC调用 放入事务方法中,如果事务提交失败,数据库最后回滚,但是RPC调用 无法回滚,这就导致了严重的数据不一致 问题。

又比如,我们在事务方法中加入太多耗时操作,例如文件操作,更新缓存,发送消息等,就会让事务变成一个长事务数据库连接会长时间地被占用,就可能导致数据库连接池耗尽,也更容易产生死锁问题。

更为关键的是,由于这种事务注解很容易被人忽略,并且还存在方法嵌套,所以上面的问题很容易“防不慎防”。相反,使用编程式事务,能让开发者很清楚地明确事务的边界

事务失效

另外一个比较重要的问题就是,由于开发者的疏忽或者技术水平的原因,可能会导致声明式事务失效

在讲 事务失效 之前,我们需要简单了解一下 Spring 声明式事务的原理是什么,这里后面会深入探究,这里先简单概述一下:

Spring 声明式事务通过AOP实现,基于@Transactional注解和事务管理器。Spring使用代理模式(JDK动态代理或CGLIB)拦截带有@Transactional 的方法调用,在方法执行前获取事务配置,启动事务;若方法成功执行,提交事务;若发生异常,根据配置回滚事务。这些流程由 TransactionInterceptor 调用PlatformTransactionManager 完成,从而实现自动管理事务边界。

既然 Spring声明式事务 的实现依赖于 AOP,那么按照道理说,所有能到导致 AOP 失效 的情况也都会导致声明式事务事务实效

那么又是哪些情况会导致 AOP 失效呢?

这就要继续追问,AOP 的原理是什么?

Spring 中AOP 的原理就是 JDK动态代理和 CGLIB 代理,有接口的使用 JDK动态代理,没接口的使用 CGLIB 代理

所以,以下情况不走代理或者无法代理的情况会导致 AOP 失效,从而导致 事务失效

  • 内部调用
    • • 类内部方法调用直接调用原始对象,根本不涉及代理对象,所以事务必然失效,但是也可以通过依赖注入自己来规避。
  • • 非 public 方法、final 方法、静态方法
    • • 这是一个常被忽略的点,动态代理无法代理非public方法final方法静态方法,所以这些方法上的事务也会失效。

除了上面讲到的那些AOP的原因,还有一些跟 Spring 事务属性配置相关的。

  • • rollbackFor 设置错误
    • • 这个比较好理解,就是发生异常和设置异常不匹配,导致事务未回滚。
  • • propagation 设置错误
    • • 这个等下深入探讨

其实上面讲的都是可能因为开发人员人为疏忽导致的事务问题,但是正是因为谁也不能保证开发人员不会犯错,水平极高,所以只能通过一些开发规范来尽量规避这些问题,使用编程式事务,就能够大大减少上述问题的发生。当然,这也不是说声明式事务完全不能用,只是说不能滥用。

三、深入探究声明式事务的原理


其实刚才还漏讲了一个可能导致事务失效的原因:多线程跨线程的事务管理 Spring 的声明式事务也是不支持的。

这里,我们就要开始从声明式事务的原理,也就是它是怎么实现的开始讲起了。

我们在上文中简单提到了 AOP 和 事务管理器 - TransactionManager

我们可以从源码获取更多细节。

1、入口类:TransactionAspectSupport

public abstractclassTransactionAspectSupport {
    
    // 核心方法: 在事务环境中执行目标方法
    protected Object invokeWithinTransaction(Method method, Class<?> targetClass, InvocationCallback invocation)throws Throwable {
        // 1. 获取事务属性
        TransactionAttributeSourcetas= getTransactionAttributeSource();
        TransactionAttributetxAttr= (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        
        // 2. 确定事务管理器
        TransactionManagertm= determineTransactionManager(txAttr, targetClass);
        
        // 3. 处理响应式事务
        if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager rtm) {
            // 处理响应式事务逻辑...
            return handleReactiveTransaction(/*...*/);
        }
        
        // 4. 处理普通事务 使用 PlatformTransactionManager
        PlatformTransactionManagerptm= asPlatformTransactionManager(tm);
        StringjoinpointIdentification= methodIdentification(method, targetClass, txAttr);
        
        // 5. 执行事务处理
        if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
            // 标准事务处理流程:
            TransactionInfotxInfo= createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
            ObjectretVal=null;
            try {
                // 执行目标方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 异常回滚
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                if (txInfo != null) {
                    txInfo.restoreThreadLocalStatus();
                }
            }
            // 提交事务
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
        else {
            // 回调式事务处理...
        }
    }
}

从上面我们可以很清晰地看到整体事务处理流程。

我们先从 "5. 执行事务处理" 中的很重要的一个类 TransactionInfo 说起,看下它里面存的是什么?

protected staticfinalclassTransactionInfo {
    // 事务管理器
    privatefinal PlatformTransactionManager transactionManager;
    // 事务属性(传播行为、隔离级别等配置)
    privatefinal TransactionAttribute transactionAttribute;
    // 方法标识(用于日志)
    privatefinal String joinpointIdentification;
    // 当前事务状态
    private TransactionStatus transactionStatus;
    // 父事务信息
    private TransactionInfo oldTransactionInfo;
}

TransactionInfo 存储的就是事务信息,它的主要作用是:

  • • 事务上下文信息的封装
    • • 保存当前事务的完整上下文信息
    • • 包括事务管理器事务属性配置事务状态
    • • 作为事务执行过程中的信息载体

不过,这个事务信息 TransactionInfo 保存在哪里呢?这是一个很重要的问题。

我们来梳理下 TransactionInfo 的需求:

  1. 1. 每个线程都有自己独立的事务上下文,事务跟当前调用线程息息相关。
  2. 2. 事务可以设置各种传播属性(REQUIRED、REQUIRES_NEW等),也就是同一线程内的方法调用应该可以很方便地访问当前事务信息,从而决定是否新建事务,换句话说,也就是 事务可以嵌套传播
  3. 3. 另外,我们也不希望在方法调用链中显式传递事务信息

从上面的需求,我们自然而然地想到了Java中的 ThreadLocal,它用来保存事务信息再合适不过,事实也的确如此。

我们看下 prepareTransactionInfo() -> bindToThread() ,其实就是将当前事务信息 TransactionInfo 保存到 ThreadLocal 中。

// 父事务信息
private TransactionInfo oldTransactionInfo;

// 保存当前调用线程的事务信息
privatestaticfinal ThreadLocal<TransactionInfo> transactionInfoHolder = 
      newNamedThreadLocal<>("Current aspect-driven transaction");

// 将当前事务,放入事务上下文 ThreadLocal
privatevoidbindToThread() {
    // 暂存父事务到 oldTransactionInfo
    this.oldTransactionInfo = transactionInfoHolder.get();
    // 保存当前事务到 ThreadLocal
    transactionInfoHolder.set(this);
}

这里我们也就明白了为什么一开始说,多线程也会导致声明式事务失效,因为 ThreadLocal 保存的内容不能跨线程

接着我们继续回到 TransactionInfo,它里面其它几个属性非常好理解,但是其中 oldTransactionInfo 让人觉得有点奇怪,它到底有什么用?其实它非常重要,具有特殊作用。

我们先看下面这个场景:

@Transactional
publicvoidouter() {
    // 创建 TransactionInfo1
    inner();  // 调用内层事务方法
    // 恢复到 TransactionInfo1
}

@Transactional
publicvoidinner() {
    // 创建 TransactionInfo2
    // oldTransactionInfo 指向 TransactionInfo1
    // 方法结束时恢复到 TransactionInfo1
}

看到这里是不是恍然大悟,在这种方法嵌套中,要怎么处理父事务子事务 ?答案就在 oldTransactionInfo 属性。

private void bindToThread() {
    // 取出 transactionInfoHolder(ThreadLocal) 中的父事务 TransactionInfo1 到 oldTransactionInfo
    this.oldTransactionInfo = transactionInfoHolder.get();
    // 将新事务 TransactionInfo2 保存到 transactionInfoHolder
    transactionInfoHolder.set(this);
}

private void restoreThreadLocalStatus() {
    // 处理完了 TransactionInfo2,恢复之前保存的父事务 TransactionInfo1
    transactionInfoHolder.set(this.oldTransactionInfo);
}

小结: oldTransactionInfo 的作用

  • • 事务现场保护和支持事务嵌套
    • • 在嵌套事务场景中,保存外层事务的信息。这样确保内层事务执行完成后,可以恢复到外层事务的上下文。
  • • 事务上下文的完整性
    • • 维护 ThreadLocal 中事务信息 TransactionInfo 的完整性
    • • 形成事务信息的链式结构,支持多层事务嵌套

接着我们再回到入口处,看下 createTransactionIfNecessary 方法,这个方法也比较关键,从中可以看出我们的事务是如何创建的,以及在注解中设置的 propagation 属性在这里会起什么作用。

protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm,
        TransactionAttribute txAttr, final String joinpointIdentification) {

    // 如果事务没有指定名称,使用方法名称作为事务名称
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = newDelegatingTransactionAttribute(txAttr)....
    }

    // 从事务管理器获取事务状态
    TransactionStatusstatus=null;
    // 事务属性不为空
    if (txAttr != null) {
        //存在事务管理器
        if (tm != null) {
            // 根据指定的传播行为,返回当前活跃的事务或者新建一个事务
            status = tm.getTransaction(txAttr);
        }
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

我们先跳过

PlatformTransactionManager#getTransaction

先看下

TransactionAspectSupport#prepareTransactionInfo

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, String joinpointIdentification,
        @Nullable TransactionStatus status) {

    // 1. 创建事务信息对象
    TransactionInfotxInfo=newTransactionInfo(tm, txAttr, joinpointIdentification);
    
    if (txAttr != null) {
        // 2. 如果有事务属性配置,说明需要事务
        // 3. 设置事务状态
        txInfo.newTransactionStatus(status);
    }
    else {
        // 4. 没有事务属性,说明不需要事务
    }

    // 5. 重要:总是绑定到ThreadLocal
    txInfo.bindToThread();
    return txInfo;
}

prepareTransactionInfo 主要就是创建 TransactionInfo 对象(包含:事务管理器事务属性方法标识符等),维护事务状态,并且绑定到 ThreadLocal 。

事务管理器:PlatformTransactionManager

现在继续深入到

PlatformTransactionManager#getTransaction(TransactionDefinition)

PlatformTransactionManager 其实是个接口,所以要看继承它的抽象类:

AbstractPlatformTransactionManager#getTransaction

public final TransactionStatus getTransaction(TransactionDefinition definition)
            throws TransactionException {

    // 如果没有事务定义使用默认值
    TransactionDefinitiondef= (definition != null ? definition : TransactionDefinition.withDefaults());

    // 获取事务,不同的ORM框架(JDBC、JPA、Hibernate等)获取事务的方式可能不同,交由子类去实现
    Objecttransaction= doGetTransaction();

    // 如果当前已经存在事务,并且该事务处于活跃状态
    if (isExistingTransaction(transaction)) {
        // 根据设置的传播行为决定如何处理
        return handleExistingTransaction(def, transaction, debugEnabled);
    }

    // 为新事务检查超时时间设置
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        thrownewInvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // 当前不存在事务,根据设置的传播行为决定如何处理,如果设定为 PROPAGATION_MANDATORY,抛出异常
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        thrownewIllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 如果是其它的,开启一个新事务
    elseif (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHoldersuspendedResources= suspend(null);
    
        try {
            // 开启新事务
            return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error ex) {
            // 恢复挂起资源
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        // 创建“空”事务:没有实际事务,但可能会有事务同步器。
        booleannewSynchronization= (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

接着看看 handleExistingTransaction ,看下如果当前事务上下文存在活跃事务会如何处理。

private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {

    // 如果传播行为是 PROPAGATION_NEVER,抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        thrownewIllegalTransactionStateException...
    }

    // 如果传播行为是 PROPAGATION_NOT_SUPPORTED,直接在非事务中运行
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        // 挂起当前事务
        ObjectsuspendedResources= suspend(transaction);
        booleannewSynchronization= (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        // 第二个参数传递null,表示不需要事务
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    // 如果是 PROPAGATION_REQUIRES_NEW,
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // 挂起当前事务
        SuspendedResourcesHoldersuspendedResources= suspend(transaction);
        try {
            // 开启新事务
            return startTransaction(definition, transaction, false, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error beginEx) {
            // 出现异常,恢复挂起事务
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // 省略
    }

    // PROPAGATION_REQUIRED, PROPAGATION_SUPPORTS, PROPAGATION_MANDATORY:
    // 原事务是否有效
    if (isValidateExistingTransaction()) {
        // 新加入的事务必须与原事务隔离级别相同 否则报错
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            IntegercurrentIsolationLevel= TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                thrownewIllegalTransactionStateException...
            }
        }
        // 当前事务非只读事务 但是已经存在的事务是只读 报错
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                thrownewIllegalTransactionStateException...
            }
        }
    }

    // 事务同步器:提供了事务生命周期的钩子方法 用于资源管理、状态清理、监控等场景
    booleannewSynchronization= (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // 根据给定参数创建新的事务状态
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

可以看出

AbstractPlatformTransactionManager#getTransaction

会根据不同的事务传播行为做出不同行为,创建新事务,或者使用现有事务,或者挂起当前事务,或者抛出异常等。

就比如PROPAGATION_REQUIRED 和 PROPAGATION_REQUIRES_NEW的在处理上区别如下:

  1. 1. PROPAGATION_REQUIRED
  • • 不挂起当前事务
  • • 不创建新事务
  • • 返回的 TransactionStatus 中 transaction 为 现有事务
  1. 2. PROPAGATION_REQUIRES_NEW
  • • 挂起当前事务
  • • 创建新事务
  • • 返回的 TransactionStatus 中 transaction 为 新事务

这里的返回值 TransactionStatus 其实就是 事务状态,会作为参数传给上面的 prepareTransactionInfo() 方法。

public classDefaultTransactionStatusextendsAbstractTransactionStatus {
    // 事务名称
    privatefinal String transactionName;
    // 事务
    privatefinal Object transaction;
    // 是否是新事务
    privatefinalboolean newTransaction;
    // 是否是新的事务同步器
    privatefinalboolean newSynchronization;
    // 是否嵌套
    privatefinalboolean nested;
    // 是否只读
    privatefinalboolean readOnly;
    // debug标记
    privatefinalboolean debug;
    // 事务挂起暂存的资源
    privatefinal Object suspendedResources;
}

总结:声明式事务实现原理和过程

一、核心组件

  • • TransactionAspectSupport:事务切面支持类
  • • PlatformTransactionManager:事务管理器
  • • TransactionInfo:事务信息载体
  • • ThreadLocal:事务上下文存储器

二、实现过程

整体流程图:

流程说明

  1. 1. 事务拦截
    • • 通过 AOP 拦截带有 @Transactional 注解的方法,调用 TransactionAspectSupport.invokeWithinTransaction()
  2. 2. 事务准备
    • • 获取事务属性(TransactionAttribute)
    • • 确定事务管理器(PlatformTransactionManager)
    • • 事务上下文管理
      • • 创建 TransactionInfo 并保存事务状态
      • • 根据传播行为 决定是否创建新事务或者挂起当前事务
    • • 通过 ThreadLocal 存储事务信息
    • • 支持事务嵌套(通过 oldTransactionInfo)
  3. 3. 方法执行
try {
    // 执行业务方法
    retVal = invocation.proceedWithInvocation();
    // 提交事务
    commitTransactionAfterReturning();
} catch (Exception ex) {
    // 回滚事务
    completeTransactionAfterThrowing();
    throw ex;
} finally {
    // 清理事务信息
    cleanupTransactionInfo();
}

四、场景题


最后,我们出一个场景题来看下你对刚才的事务的传播行为的理解。

@Service
publicclassDemoServiceA {

    @Resource
    private DemoServiceB demoServiceB;

    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
    publicvoida() {
        // 步骤A:写数据库1
        // 步骤B:写数据库2
        // 步骤C:调用 DemoServiceB.b() 方法读数据
        vardata= demoServiceB.b();
    }

}

@Service
publicclassDemoServiceB {

    @Transactional(rollbackFor = Exception.class, propagation = Propagation.?)
    public Object b() {
        // read data from db
    }

}

在上面的场景中,demoServiceB 的 b() 方法的传播行为-propagation 应该设置为什么呢?

答案是应该设置为:

Propagation.NOT_SUPPORTED

因为这样的话,如果步骤C 读取数据失败,不会导致步骤A 和 步骤B 中数据修改回滚

那如果是下面这样呢?

调用 demoServiceB.b() 步骤在中间。

@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void a() {
      // 步骤A:写数据库1
      // 步骤B:调用 DemoServiceB.b() 方法读数据
      var data = demoServiceB.b();
      // 步骤C:写数据库2
}

答案是应该设置为:

Propagation.REQUIRED

因为这样的话,如果步骤B 读取数据失败,步骤C 还没开始,步骤A 修改了的数据应该回滚的。

希望到这里,你对 Spring 的事务原理和事务的使用有更多的认识。