1. 程式人生 > 程式設計 >這一次搞懂Spring事務是如何傳播的

這一次搞懂Spring事務是如何傳播的

前言

上一篇分析了事務註解的解析過程,本質上是將事務封裝為切面加入到AOP的執行鏈中,因此會呼叫到MethodInceptor的實現類的invoke方法,而事務切面的Interceptor就是TransactionInterceptor,所以本篇直接從該類開始。

正文

事務切面的呼叫過程

 public Object invoke(MethodInvocation invocation) throws Throwable {
 // Work out the target class: may be {@code null}.
 // The TransactionAttributeSource should be passed the target class
 // as well as the method,which may be from an interface.
 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

 // Adapt to TransactionAspectSupport's invokeWithinTransaction...
 return invokeWithinTransaction(invocation.getMethod(),targetClass,invocation::proceed);
 }

這個方法本身沒做什麼事,主要是呼叫了父類的invokeWithinTransaction方法,注意最後一個引數,傳入的是一個lambda表示式,而這個表示式中的呼叫的方法應該不陌生,在分析AOP呼叫鏈時,就是通過這個方法傳遞到下一個切面或是呼叫被代理例項的方法,忘記了的可以回去看看。

 protected Object invokeWithinTransaction(Method method,@Nullable Class<?> targetClass,final InvocationCallback invocation) throws Throwable {

 // If the transaction attribute is null,the method is non-transactional.
 //獲取事務屬性類 AnnotationTransactionAttributeSource
 TransactionAttributeSource tas = getTransactionAttributeSource();

 //獲取方法上面有@Transactional註解的屬性
 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method,targetClass) : null);

 //獲取事務管理器
 final PlatformTransactionManager tm = determineTransactionManager(txAttr);
 final String joinpointIdentification = methodIdentification(method,txAttr);

 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
 TransactionInfo txInfo = createTransactionIfNecessary(tm,txAttr,joinpointIdentification);
 Object retVal = null;
 try {
 // 呼叫proceed方法
 retVal = invocation.proceedWithInvocation();
 }
 catch (Throwable ex) {
 // target invocation exception
 //事務回滾
 completeTransactionAfterThrowing(txInfo,ex);
 throw ex;
 }
 finally {
 cleanupTransactionInfo(txInfo);
 }
 //事務提交
 commitTransactionAfterReturning(txInfo);
 return retVal;
 }
 
 // 省略了else
 }

這個方法邏輯很清晰,一目瞭然,if裡面就是對宣告式事務的處理,先呼叫createTransactionIfNecessary方法開啟事務,然後通過invocation.proceedWithInvocation呼叫下一個切面,如果沒有其它切面了,就是呼叫被代理類的方法,出現異常就回滾,否則提交事務,這就是Spring事務切面的執行過程。但是,我們主要要搞懂的就是在這些方法中是如何管理事務以及事務在多個方法之間是如何傳播的。

事務的傳播性概念

傳播性是Spring自己搞出來的,資料庫是沒有的,因為涉及到方法間的呼叫,那麼必然就需要考慮事務在這些方法之間如何流轉,所以Spring提供了7個傳播屬性供選擇,可以將其看成兩大類,即是否支援當前事務:

支援當前事務(在同一個事務中):

PROPAGATION_REQUIRED:支援當前事務,如果不存在,就新建一個事務。

PROPAGATION_MANDATORY:支援當前事務,如果不存在,就丟擲異常。

PROPAGATION_SUPPORTS:支援當前事務,如果不存在,就不使用事務。

不支援當前事務(不在同一個事務中):

PROPAGATION_NEVER:以非事務的方式執行,如果有事務,則丟擲異常。

PROPAGATION_NOT_SUPPORTED:以非事務的方式執行,如果有事務,則掛起當前事務。

PROPAGATION_REQUIRES_NEW:新建事務,如果有事務,掛起當前事務(兩個事務相互獨立,父事務回滾不影響子事務)。

PROPAGATION_NESTED:如果當前事務存在,則巢狀事務執行(指必須依存父事務,子事務不能單獨提交且父事務回滾則子事務也必須回滾,而子事務若回滾,父事務可以回滾也可以捕獲異常)。如果當前沒有事務,則進行與PROPAGATION_REQUIRED類似的操作。

別看屬性這麼多,實際上我們主要用的是PROPAGATION_REQUIRED預設屬性,一些特殊業務下可能會用到PROPAGATION_REQUIRES_NEW以及PROPAGATION_NESTED。

下面我會假設一個場景,並主要分析這三個屬性。

public class A {
 
 @Autowired
 private B b;

 @Transactional
 public void addA() {
 b.addB();
 }
}

public class B {
 @Transactional
 public void addB() {
 // doSomething...
 }
}



上面我建立了A、B兩個類,每個類中有一個事務方法,使用了宣告式事務並採用的預設傳播屬性,在A中呼叫了B的方法。

當請求來了呼叫addA時,首先呼叫的是代理物件的方法,因此會進入createTransactionIfNecessary方法開啟事務:

 protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,@Nullable TransactionAttribute txAttr,final String joinpointIdentification) {

 // If no name specified,apply method identification as transaction name.
 if (txAttr != null && txAttr.getName() == null) {
 txAttr = new DelegatingTransactionAttribute(txAttr) {
 @Override
 public String getName() {
 return joinpointIdentification;
 }
 };
 }

 TransactionStatus status = null;
 if (txAttr != null) {
 if (tm != null) {
 //開啟事務,這裡重點看
 status = tm.getTransaction(txAttr);
 }
 else {
 }
 }
 //建立事務資訊物件,記錄新老事務資訊物件
 return prepareTransactionInfo(tm,joinpointIdentification,status);
 }

實際上開啟事務是通過AbstractPlatformTransactionManager做的,而這個類是一個抽象類,具體例項化的物件就是我們在專案裡常配置的DataSourceTransactionManager物件。

 public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
 //這裡重點看,.DataSourceTransactionObject拿到物件
 Object transaction = doGetTransaction();

 // Cache debug flag to avoid repeated checks.
 boolean debugEnabled = logger.isDebugEnabled();

 if (definition == null) {
 // Use defaults if no transaction definition given.
 definition = new DefaultTransactionDefinition();
 }

 //第一次進來connectionHolder為空的,所以不存在事務
 if (isExistingTransaction(transaction)) {
 // Existing transaction found -> check propagation behavior to find out how to behave.
 return handleExistingTransaction(definition,transaction,debugEnabled);
 }

 // Check definition settings for new transaction.
 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
 throw new InvalidTimeoutException("Invalid transaction timeout",definition.getTimeout());
 }

 // No existing transaction found -> check propagation behavior to find out how to proceed.
 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
 throw new IllegalTransactionStateException(
 "No existing transaction found for transaction marked with propagation 'mandatory'");
 }
 //第一次進來大部分會走這裡
 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
 //先掛起
 SuspendedResourcesHolder suspendedResources = suspend(null);
 if (debugEnabled) {
 logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
 }
 try {
 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 //建立事務狀態物件,其實就是封裝了事務物件的一些資訊,記錄事務狀態的
 DefaultTransactionStatus status = newTransactionStatus(
 definition,true,newSynchronization,debugEnabled,suspendedResources);

 //開啟事務,重點看看 DataSourceTransactionObject
 doBegin(transaction,definition);

 //開啟事務後,改變事務狀態
 prepareSynchronization(status,definition);
 return status;
 }
 catch (RuntimeException | Error ex) {
 resume(null,suspendedResources);
 throw ex;
 }
 }
 else {
 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
 return prepareTransactionStatus(definition,null,null);
 }
 }

這個方法流程比較長,一步步來看,先呼叫doGetTransaction方法獲取一個DataSourceTransactionObject物件,這個類是JdbcTransactionObjectSupport的子類,在父類中持有了一個ConnectionHolder物件,見名知意,這個物件儲存了當前的連線。

 protected Object doGetTransaction() {
 //管理connection物件,建立回滾點,按照回滾點回滾,釋放回滾點
 DataSourceTransactionObject txObject = new DataSourceTransactionObject();

 //DataSourceTransactionManager預設是允許巢狀事務的
 txObject.setSavepointAllowed(isNestedTransactionAllowed());

 //obtainDataSource() 獲取資料來源物件,其實就是資料庫連線塊物件
 ConnectionHolder conHolder =
 (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
 txObject.setConnectionHolder(conHolder,false);
 return txObject;
 }

追溯getResource方法可以看到ConnectionHolder 是從ThreadLocal裡獲取的,也就是當前執行緒,key是DataSource物件;但是仔細思考下我們是第一次進來,所以這裡肯定獲取不到的,反之,要從這裡獲取到值,那必然是同一個執行緒第二次及以後進入到這裡,也就是在addA呼叫addB時,另外需要注意這裡儲存ConnectionHolder到DataSourceTransactionObject物件時是將newConnectionHolder屬性設定為false了的。

繼續往後,建立完transaction物件後,會呼叫isExistingTransaction判斷是否已經存在一個事務,如果存在就會呼叫handleExistingTransaction方法,這個方法就是處理事務傳播的核心方法,因為我們是第一次進來,肯定不存在事務,所以先跳過。

再往後,可以看到就是處理不同的傳播屬性,主要看到下面這個部分:

 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
 //先掛起
 SuspendedResourcesHolder suspendedResources = suspend(null);
 if (debugEnabled) {
 logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
 }
 try {
 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 //建立事務狀態物件,其實就是封裝了事務物件的一些資訊,記錄事務狀態的
 DefaultTransactionStatus status = newTransactionStatus(
 definition,suspendedResources);
 throw ex;
 }
 }

第一次進來時,PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW和PROPAGATION_NESTED都會進入到這裡,首先會呼叫suspend掛起當前存在的事務,在這裡沒啥作用。接下來通過newTransactionStatus建立了DefaultTransactionStatus物件,這個物件主要就是儲存當前事務的一些狀態資訊,需要特別注意newTransaction屬性設定為了true,表示是一個新事務。

狀態物件建立好之後就是通過doBegin開啟事務,這是一個模板方法:

protected void doBegin(Object transaction,TransactionDefinition definition) {
 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
 Connection con = null;

 try {
 //如果沒有資料庫連線
 if (!txObject.hasConnectionHolder() ||
 txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
 //從連線池裡面獲取連線
 Connection newCon = obtainDataSource().getConnection();
 if (logger.isDebugEnabled()) {
 logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
 }
 //把連線包裝成ConnectionHolder,然後設定到事務物件中
 txObject.setConnectionHolder(new ConnectionHolder(newCon),true);
 }

 txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
 con = txObject.getConnectionHolder().getConnection();

 //從資料庫連線中獲取隔離級別
 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con,definition);
 txObject.setPreviousIsolationLevel(previousIsolationLevel);

 // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,// so we don't want to do it unnecessarily (for example if we've explicitly
 // configured the connection pool to set it already).
 if (con.getAutoCommit()) {
 txObject.setMustRestoreAutoCommit(true);
 if (logger.isDebugEnabled()) {
 logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
 }
 //關閉連線的自動提交,其實這步就是開啟了事務
 con.setAutoCommit(false);
 }

 //設定只讀事務 從這一點設定的時間點開始(時間點a)到這個事務結束的過程中,其他事務所提交的資料,該事務將看不見!
 //設定只讀事務就是告訴資料庫,我這個事務內沒有新增,修改,刪除操作只有查詢操作,不需要資料庫鎖等操作,減少資料庫壓力
 prepareTransactionalConnection(con,definition);

 //自動提交關閉了,就說明已經開啟事務了,事務是活動的
 txObject.getConnectionHolder().setTransactionActive(true);

 int timeout = determineTimeout(definition);
 if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
 txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
 }

 // Bind the connection holder to the thread.
 if (txObject.isNewConnectionHolder()) {
 //如果是新建立的事務,則建立當前執行緒和資料庫連線的關係
 TransactionSynchronizationManager.bindResource(obtainDataSource(),txObject.getConnectionHolder());
 }
 }

 catch (Throwable ex) {
 if (txObject.isNewConnectionHolder()) {
 DataSourceUtils.releaseConnection(con,obtainDataSource());
 txObject.setConnectionHolder(null,false);
 }
 throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction",ex);
 }
 }

這個方法裡面主要做了六件事:

首先從連線池獲取連線並儲存到DataSourceTransactionObject物件中。

關閉資料庫的自動提交,也就是開啟事務。

獲取資料庫的隔離級別。

根據屬性設定該事務是否為只讀事務。

將該事務標識為活動事務(transactionActive=true)。

將ConnectionHolder物件與當前執行緒繫結。

完成之後通過prepareSynchronization將事務的屬性和狀態設定到TransactionSynchronizationManager物件中進行管理。最後返回到createTransactionIfNecessary方法中建立TransactionInfo物件與當前執行緒繫結並返回。

通過以上的步驟就開啟了事務,接下來就是通過proceedWithInvocation呼叫其它切面,這裡我們先假設沒有其它切面了,那麼就是直接呼叫到A類的addA方法,在這個方法中又呼叫了B類的addB方法,那麼肯定也是呼叫到代理類的方法,因此又會進入到createTransactionIfNecessary方法中。但這次進來通過isExistingTransaction判斷是存在事務的,因此會進入到handleExistingTransaction方法:

 private TransactionStatus handleExistingTransaction(
 TransactionDefinition definition,Object transaction,boolean debugEnabled)
 throws TransactionException {

 //不允許有事務,直接異常
 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
 throw new IllegalTransactionStateException(
 "Existing transaction found for transaction marked with propagation 'never'");
 }

 //以非事務方式執行操作,如果當前存在事務,就把當前事務掛起
 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
 if (debugEnabled) {
 logger.debug("Suspending current transaction");
 }
 //掛起當前事務
 Object suspendedResources = suspend(transaction);
 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
 //修改事務狀態資訊,把事務的一些資訊儲存到當前執行緒中,ThreadLocal中
 return prepareTransactionStatus(
 definition,false,suspendedResources);
 }

 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
 if (debugEnabled) {
 logger.debug("Suspending current transaction,creating new transaction with name [" +
 definition.getName() + "]");
 }
 //掛起
 SuspendedResourcesHolder suspendedResources = suspend(transaction);
 try {
 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 DefaultTransactionStatus status = newTransactionStatus(
 definition,suspendedResources);
 doBegin(transaction,definition);
 prepareSynchronization(status,definition);
 return status;
 }
 catch (RuntimeException | Error beginEx) {
 resumeAfterBeginException(transaction,suspendedResources,beginEx);
 throw beginEx;
 }
 }

 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
 if (!isNestedTransactionAllowed()) {
 throw new NestedTransactionNotSupportedException(
 "Transaction manager does not allow nested transactions by default - " +
 "specify 'nestedTransactionAllowed' property with value 'true'");
 }
 if (debugEnabled) {
 logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
 }
 //預設是可以巢狀事務的
 if (useSavepointForNestedTransaction()) {
 // Create savepoint within existing Spring-managed transaction,// through the SavepointManager API implemented by TransactionStatus.
 // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
 DefaultTransactionStatus status =
 prepareTransactionStatus(definition,null);
 //建立回滾點
 status.createAndHoldSavepoint();
 return status;
 }
 else {
 // Nested transaction through nested begin and commit/rollback calls.
 // Usually only for JTA: Spring synchronization might get activated here
 // in case of a pre-existing JTA transaction.
 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 DefaultTransactionStatus status = newTransactionStatus(
 definition,null);
 doBegin(transaction,definition);
 return status;
 }
 }
 
 // 省略
 
 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 return prepareTransactionStatus(definition,null);
 }

這裡面也是對每個傳播屬性的判斷,先看PROPAGATION_REQUIRES_NEW的處理,因為該屬性要求每次呼叫都開啟一個新的事務,所以首先會將當前事務掛起,怎麼掛起呢?

 protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
 if (TransactionSynchronizationManager.isSynchronizationActive()) {
 List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
 try {
 Object suspendedResources = null;
 //第一次進來,肯定為null的
 if (transaction != null) {
 //吧connectionHolder設定為空
 suspendedResources = doSuspend(transaction);
 }

 //做資料還原操作
 String name = TransactionSynchronizationManager.getCurrentTransactionName();
 TransactionSynchronizationManager.setCurrentTransactionName(null);
 boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
 TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
 Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
 boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
 TransactionSynchronizationManager.setActualTransactionActive(false);
 return new SuspendedResourcesHolder(
 suspendedResources,suspendedSynchronizations,name,readOnly,isolationLevel,wasActive);
 }
 catch (RuntimeException | Error ex) {
 // doSuspend failed - original transaction is still active...
 doResumeSynchronization(suspendedSynchronizations);
 throw ex;
 }
 }
 else if (transaction != null) {
 // Transaction active but no synchronization active.
 Object suspendedResources = doSuspend(transaction);
 return new SuspendedResourcesHolder(suspendedResources);
 }
 else {
 // Neither transaction nor synchronization active.
 return null;
 }
 }

 protected Object doSuspend(Object transaction) {
 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
 txObject.setConnectionHolder(null);
 //解除繫結關係,
 return TransactionSynchronizationManager.unbindResource(obtainDataSource());
 }

這裡明顯是進入第一個if並且會呼叫到doSuspend方法,整體來說掛起事務很簡單:首先將DataSourceTransactionObject的ConnectionHolder設定為空並解除與當前執行緒的繫結,之後將解除繫結的ConnectionHolder和其它屬性(事務名稱、隔離級別、只讀屬性)通通封裝到SuspendedResourcesHolder物件,並將當前事務的活動狀態設定為false。掛起事務之後又通過newTransactionStatus建立了一個新的事務狀態並呼叫doBegin開啟事務,這裡不再重複分析。

接著來看PROPAGATION_NESTED:

 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
 if (!isNestedTransactionAllowed()) {
 throw new NestedTransactionNotSupportedException(
 "Transaction manager does not allow nested transactions by default - " +
 "specify 'nestedTransactionAllowed' property with value 'true'");
 }
 if (debugEnabled) {
 logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
 }
 //預設是可以巢狀事務的
 if (useSavepointForNestedTransaction()) {
 // Create savepoint within existing Spring-managed transaction,definition);
 return status;
 }
 }

這裡面可以看到如果允許巢狀事務,就會建立一個DefaultTransactionStatus物件(注意newTransaction是false,表明不是一個新事務)和回滾點;如果不允許巢狀,就會建立新事務並開啟。

當上面的判斷都不滿足時,也就是傳播屬性為預設PROPAGATION_REQUIRED時,則只是建立了一個newTransaction為false的DefaultTransactionStatus返回。

完成之後又是呼叫proceedWithInvocation,那麼就是執行B類的addB方法,假如沒有發生異常,那麼就會回到切面呼叫commitTransactionAfterReturning提交addB的事務:

 protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
 if (txInfo != null && txInfo.getTransactionStatus() != null) {
 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
 }
 }

 public final void commit(TransactionStatus status) throws TransactionException {

 processCommit(defStatus);
 }

 private void processCommit(DefaultTransactionStatus status) throws TransactionException {
 try {
 boolean beforeCompletionInvoked = false;

 try {
 boolean unexpectedRollback = false;
 prepareForCommit(status);
 triggerBeforeCommit(status);
 triggerBeforeCompletion(status);
 beforeCompletionInvoked = true;

 if (status.hasSavepoint()) {
 if (status.isDebug()) {
 logger.debug("Releasing transaction savepoint");
 }
 // 如果是nested,沒有提交,只是將savepoint清除掉了
 unexpectedRollback = status.isGlobalRollbackOnly();
 status.releaseHeldSavepoint();
 }
 //如果都是PROPAGATION_REQUIRED,最外層的才會走進來統一提交,如果是PROPAGATION_REQUIRES_NEW,每一個事務都會進來
 else if (status.isNewTransaction()) {
 if (status.isDebug()) {
 logger.debug("Initiating transaction commit");
 }
 unexpectedRollback = status.isGlobalRollbackOnly();
 doCommit(status);
 }
 else if (isFailEarlyOnGlobalRollbackOnly()) {
 unexpectedRollback = status.isGlobalRollbackOnly();
 }
 }
 }
 finally {
 cleanupAfterCompletion(status);
 }
 }

主要邏輯在processCommit方法中。如果存在回滾點,可以看到並沒有提交事務,只是將當前事務的回滾點清除了;而如果是新事務,就會呼叫doCommit提交事務,也就是隻有PROPAGATION_REQUIRED屬性下的最外層事務和PROPAGATION_REQUIRES_NEW屬性下的事務能提交。事務提交完成後會呼叫cleanupAfterCompletion清除當前事務的狀態,如果有掛起的事務還會通過resume恢復掛起的事務(將解綁的連線和當前執行緒繫結以及將之前儲存的事務狀態重新設定回去)。當前事務正常提交後,那麼就會輪到addA方法提交,處理邏輯同上,不再贅述。

如果呼叫addB發生異常,就會通過completeTransactionAfterThrowing進行回滾:

 protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo,Throwable ex) {
 if (txInfo != null && txInfo.getTransactionStatus() != null) {
 if (logger.isTraceEnabled()) {
 logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
 "] after exception: " + ex);
 }
 if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
 try {
 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
 }
 }
 }
 }

 public final void rollback(TransactionStatus status) throws TransactionException {
 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
 processRollback(defStatus,false);
 }

 private void processRollback(DefaultTransactionStatus status,boolean unexpected) {
 try {
 boolean unexpectedRollback = unexpected;

 try {
 triggerBeforeCompletion(status);

 //按照巢狀事務按照回滾點回滾
 if (status.hasSavepoint()) {
 if (status.isDebug()) {
 logger.debug("Rolling back transaction to savepoint");
 }
 status.rollbackToHeldSavepoint();
 }
 //都為PROPAGATION_REQUIRED最外層事務統一回滾
 else if (status.isNewTransaction()) {
 if (status.isDebug()) {
 logger.debug("Initiating transaction rollback");
 }
 doRollback(status);
 }
 else {
 // Participating in larger transaction
 if (status.hasTransaction()) {
 if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
 if (status.isDebug()) {
 logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
 }
 doSetRollbackOnly(status);
 }
 else {
 if (status.isDebug()) {
 logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
 }
 }
 }
 else {
 logger.debug("Should roll back transaction but cannot - no transaction available");
 }
 // Unexpected rollback only matters here if we're asked to fail early
 if (!isFailEarlyOnGlobalRollbackOnly()) {
 unexpectedRollback = false;
 }
 }
 }
 catch (RuntimeException | Error ex) {
 triggerAfterCompletion(status,TransactionSynchronization.STATUS_UNKNOWN);
 throw ex;
 }

 triggerAfterCompletion(status,TransactionSynchronization.STATUS_ROLLED_BACK);

 // Raise UnexpectedRollbackException if we had a global rollback-only marker
 if (unexpectedRollback) {
 throw new UnexpectedRollbackException(
 "Transaction rolled back because it has been marked as rollback-only");
 }
 }
 finally {
 cleanupAfterCompletion(status);
 }
 }

流程和提交是一樣的,先是判斷有沒有回滾點,如果有就回到到回滾點並清除該回滾點;如果沒有則判斷是不是新事務(PROPAGATION_REQUIRED屬性下的最外層事務和PROPAGATION_REQUIRES_NEW屬性下的事務),滿足則直接回滾當前事務。回滾完成後同樣需要清除掉當前的事務狀態並恢復掛起的連線。另外需要特別注意的是在catch裡面呼叫完回滾邏輯後,還通過throw丟擲了異常,這意味著什麼?意味著即使是巢狀事務,內層事務的回滾也會導致外層事務的回滾,也就是addA的事務也會跟著回滾。

至此,事務的傳播原理分析完畢,深入看每個方法的實現是很複雜的,但如果僅僅是分析各個傳播屬性對事務的影響,則有一個簡單的方法。我們可以將內層事務切面等效替換掉invocation.proceedWithInvocation方法,比如上面兩個類的呼叫可以看作是下面這樣:

// addA的事務
TransactionInfo txInfo = createTransactionIfNecessary(tm,joinpointIdentification);
Object retVal = null;
try {
 // addB的事務
 TransactionInfo txInfo = createTransactionIfNecessary(tm,joinpointIdentification);
 Object retVal = null;
 try {
 retVal = invocation.proceedWithInvocation();
 }
 catch (Throwable ex) {
 // target invocation exception
 //事務回滾
 completeTransactionAfterThrowing(txInfo,ex);
 throw ex;
 }
 finally {
 cleanupTransactionInfo(txInfo);
 }
 //事務提交
 commitTransactionAfterReturning(txInfo);
}
catch (Throwable ex) {
 //事務回滾
 completeTransactionAfterThrowing(txInfo,ex);
 throw ex;
}
//事務提交
commitTransactionAfterReturning(txInfo);

這樣看是不是很容易就能分析出事務之間的影響以及是提交還是回滾了?下面來看幾個例項分析。

例項分析

我再新增一個C類,和addC的方法,然後在addA裡面呼叫這個方法。

TransactionInfo txInfo = createTransactionIfNecessary(tm,joinpointIdentification);
 Object retVal = null;
 try {
 b.addB();
 }
 catch (Throwable ex) {
 // target invocation exception
 //事務回滾
 completeTransactionAfterThrowing(txInfo,ex);
 throw ex;
 }
 //事務提交
 commitTransactionAfterReturning(txInfo);

 // addC的事務
 TransactionInfo txInfo = createTransactionIfNecessary(tm,joinpointIdentification);
 Object retVal = null;
 try {
 c.addC();
 }
 catch (Throwable ex) {
 // target invocation exception
 //事務回滾
 completeTransactionAfterThrowing(txInfo,ex);
 throw ex;
 }
 //事務提交
 commitTransactionAfterReturning(txInfo);
}
catch (Throwable ex) {
 //事務回滾
 completeTransactionAfterThrowing(txInfo,ex);
 throw ex;
}
//事務提交
commitTransactionAfterReturning(txInfo);

等效替換後就是上面這個程式碼,我們分別來分析。

都是PROPAGATION_REQUIRED屬性:通過上面的分析,我們知道三個方法都是同一個連線和事務,那麼任何一個出現異常則都會回滾。

addB為PROPAGATION_REQUIRES_NEW:

如果B中丟擲異常,那麼B中肯定會回滾,接著異常向上拋,導致A事務整體回滾;

如果C中丟擲異常,不難看出C和A都會回滾,但B已經提交了,因此不會受影響。

addC為PROPAGATION_NESTED,addB為PROPAGATION_REQUIRES_NEW:

如果B中丟擲異常,那麼B回滾並丟擲異常,A也回滾,C不會執行;

如果C中丟擲異常,先是回滾到回滾點並丟擲異常,所以A也回滾,但B此時已經提交,不受影響。

都是PROPAGATION_NESTED:雖然建立了回滾點,但是仍然是同一個連線,任何一個發生異常都會回滾,如果不想影響彼此,可以try-catch生吞子事務的異常實現。

還有其它很多情況,這裡就不一一列舉了,只要使用上面的分析方法都能夠很輕鬆的分析出來。

總結

本篇詳細分析了事務的傳播原理,另外還有隔離級別,這在Spring中沒有體現,需要我們自己結合資料庫的知識進行分析設定。最後我們還需要考慮宣告式事務和程式設計式事務的優缺點,宣告式事務雖然簡單,但不適合用在長事務中,會佔用大量連線資源,這時就需要考慮利用程式設計式事務的靈活性了。

總而言之,事務的使用並不是一律預設就好,介面的一致性和吞吐量與事務有著直接關係,嚴重情況下可能會導致系統崩潰。

以上這篇這一次搞懂Spring事務是如何傳播的就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。