Spring TX源码分析

Spring 事务管理提供全局事务管理(依赖Java提供的JTA)和本地事务管理,Spring TX在执行流程:

1Spring IOC容器初始化时加载事务管理切面配置;

2、目标类代理类生成,通过AspectJAwareAdvisorAutoProxyCreator 继承BeanPostProcessor的特性生成目标类代理对象;

3TransactionInterceptor拦截目标对象方法执行事务管理;


一、TX切面的代理类生成。配置切面后AbstractAutoProxyCreator子类AspectJAwareAdvisorAutoProxyCreator负责在IOC容器初始化对象的时候创建对应的代理类。如何创建切面的代理类见《http://blog.csdn.net/bubaxiu/article/details/42245947》。

二、走事务的方法调用和拦截:通过TransactionInterceptor拦截拦截对应的方法,其父类TransactionAspectSupport创建和执行事务。

TransactionInterceptor.invoke()方法拦截方法目标方法,并调用父类invokeWithinTransaction方法进行事务处理。

    public Object invoke(final MethodInvocation invocation) throws Throwable {
        //获取目标方法对应的目标类
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        // 适配调用父类TransactionAspectSupport的invokeWithinTransaction方法走事务
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
            @Override
            public Object proceedWithInvocation() throws Throwable {
                return invocation.proceed();
            }
        });
    }

TransactionAspectSupport.invokeWithinTransaction()方法获取事务对象信息,启动事务,调用对应的目标方法后执行事务回滚或提交

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
            throws Throwable {

        // 获取事务属性
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
        //取得对应的PlatformTransactionManager
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        final String joinpointIdentification = methodIdentification(method, targetClass);

        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            //通过给定的TransactionAttribute和PlatformTransactionManager.getTransaction划分事务commit/rollback的界线
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                //Advice链调用目标方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 抛出异常后事务处理操作
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                //清空当前方法调用的TransactionInfo
                cleanupTransactionInfo(txInfo);
            }
            //目标方法成功返回后执行提交动作
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        else {
            //通过CallbackPreferringPlatformTransactionManager扩展执行事务
            try {
                Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
                        new TransactionCallback<Object>() {
                            @Override
                            public Object doInTransaction(TransactionStatus status) {
                                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                                try {
                                    return invocation.proceedWithInvocation();
                                }
                                catch (Throwable ex) {
                                    if (txAttr.rollbackOn(ex)) {
                                        if (ex instanceof RuntimeException) {
                                            throw (RuntimeException) ex;
                                        }
                                        else {
                                            throw new ThrowableHolderException(ex);
                                        }
                                    }
                                    else {
                                        return new ThrowableHolder(ex);
                                    }
                                }
                                finally {
                                    cleanupTransactionInfo(txInfo);
                                }
                            }
                        });
......
        }
    }

  AbstractPlatformTransactionManager.getTransaction()方法,事务管理器AbstractPlatformTransactionManager提供了创建事务的模板,这个模板会被具体的事务处理器所使用,抽象事务管理器根据事务属性配置和当前线程绑定信息对事务是否需要创建以及如何创建进行一些通用的处理,然后把事务创建的底层细节交给具体的事务处理器实现。抽象事务管理器创建事务的模板方法源码如下:

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        //取得事务对象
        Object transaction = doGetTransaction();
        
        boolean debugEnabled = logger.isDebugEnabled();

        if (definition == null) {
            definition = new DefaultTransactionDefinition();
        }

        if (isExistingTransaction(transaction)) {
            //如果当前线程存在事务,则检查事务传播行为进行事务调用
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // 检查事务是否超时
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // 当前线程没有事务,检查事务传递行为决定如果执行
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            //当前没事务且传递属性为PROPAGATION_MANDATORY,抛出异常
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        //PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED这三种事务传递属性在没有事务的情况下创建新事务
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //执行启动新事务语法
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
......
}
private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {

        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }
         //如果事务传递行为为不支持事务,则当前方法调用挂起事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            //挂起事务
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            //挂起事务后直接返回事务状态
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        
        //事务传递行为为新建事务,挂起当前事务新建一个事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            //挂起当前事务
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //执行新事务语法
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
            catch (Error beginErr) {
                resumeAfterBeginException(transaction, suspendedResources, beginErr);
                throw beginErr;
            }
        }
        
        //嵌套事务处理逻辑,嵌套事务是外部事务的子事务, 如果外部事务 commit, 潜套事务也会被 commit, 这个规则同样适用于 roll back.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) {
                // Create savepoint within existing Spring-managed transaction,
                // through the SavepointManager API implemented by TransactionStatus.
                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // Nested transaction through nested begin and commit/rollback calls.
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }

        //处理 PROPAGATION_SUPPORTS或者PROPAGATION_REQUIRED事务,handleExistingTransaction方法此时不创建新事务
        //生成事务状态实例并返回
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
        if (isValidateExistingTransaction()) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

TransactionAspectSupport.completeTransactionAfterThrowing()方法当目标方法执行失败时执行对应的动作:

protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
        if (txInfo != null && txInfo.hasTransaction()) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                        "] after exception: " + ex);
            }
            //判断当前异常是否回滚
            if (txInfo.transactionAttribute.rollbackOn(ex)) {
                try {
                    //获取事务管理器回滚事务
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                }
                .....
            }
            else {
                //如果当前异常不回滚获取事务管理器提交事务
                try {
                    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                }
                ......
            }
        }
    }

TransactionAspectSupport.commitTransactionAfterReturning()当目标方法执行成功在返回值前执行的动作,具体逻辑由AbstractPlatformTransactionManager.commit()方法执行:

public final void commit(TransactionStatus status) throws TransactionException {
        //如果事务的执行状态已经结束,则抛出异常  
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        //如果事务执行状态是回滚则执行回滚  
        if (defStatus.isLocalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Transactional code has requested rollback");
            }
            processRollback(defStatus);
            return;
        }
        //如果事务没有被标记为回滚时提交,且事务状态时全局回滚  
        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
            }
            processRollback(defStatus);
            //如果事务状态是新事务,或者在全局回滚时失败
            if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
            return;
        }
        //处理提交  
        processCommit(defStatus);
    }

三、事务处理器AbstractPlatformTransactionManager子类对事务的操作,以DataSourceTransactionManager为例

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {

    private DataSource dataSource;
......
    //创建事务,对数据库而言,是由Connection来完成事务工作的。该方法把数据库的Connection对象放到一个ConnectionHolder对象中,然后封装到一个  
    //DataSourceTransactionObject对象中,同一个线程公用一个Connection
    @Override
    protected Object doGetTransaction() {
        //创建数据源事务对象 
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        //设置数据源事务对象对嵌套事务使用保存点 
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        //从事务管理容器中获取存放数据库Connection的对象  
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

    //判断是否已经存在事务
    @Override
    protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        //根据存放数据库连接的ConnectionHolder的isTransactionActive属性来判断 
        //方法嵌套调用,如果某个方法调用过程中事务被挂起,如果后续方法需要事务就得新建事务
        return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());
    }

    /**
     * This implementation sets the isolation level but ignores the timeout.
     * 处理事务开始的方法
     */
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            //如果数据源事务对象的ConnectionHolder为null或者是事务同步的 
            if (txObject.getConnectionHolder() == null ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                //获取当前数据源的数据库连接 
                Connection newCon = this.dataSource.getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                //为数据源事务对象设置ConnectionHolder
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            //设置数据源事务对象的事务同步    txObject.getConnectionHolder().setSynchronizedWithTransaction(true);  
            //获取数据源事务对象的数据库连接  
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            //根据数据连接和事务属性,获取数据库连接的事务隔离级别 
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            //为数据源事务对象设置事务隔离级别  
            txObject.setPreviousIsolationLevel(previousIsolationLevel);

            //如果数据库连接设置了自动事务提交属性,则关闭自动提交
            if (con.getAutoCommit()) {
                //保存数据库连接设置的自动连接到数据源事务对象中  
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                //设置数据库连接自动事务提交属性为false,即禁止自动事务提交
                con.setAutoCommit(false);
            }
            //激活当前数据源事务对象的事务配置
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            //如果事务配置的超时时长不等于事务的默认超时时长  
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }

            // 绑定数据源和连接到当前线程
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
            }
        }

        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.dataSource);
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

    //事务挂起  
    @Override
    protected Object doSuspend(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        txObject.setConnectionHolder(null);
        ConnectionHolder conHolder = (ConnectionHolder)
                TransactionSynchronizationManager.unbindResource(this.dataSource);
        return conHolder;
    }
    
    //恢复事务
    @Override
    protected void doResume(Object transaction, Object suspendedResources) {
        ConnectionHolder conHolder = (ConnectionHolder) suspendedResources;
        TransactionSynchronizationManager.bindResource(this.dataSource, conHolder);
    }

    //事务提交
    @Override
    protected void doCommit(DefaultTransactionStatus status) {
        //取事务对象和连接对象
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
            //事务提交
            con.commit();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
    }

    //事务回滚
    @Override
    protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
        }
        try {
            con.rollback();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }
    }

    //设置回滚 
    @Override
    protected void doSetRollbackOnly(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        if (status.isDebug()) {
            logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
                    "] rollback-only");
        }
        txObject.setRollbackOnly();
    }

    //操作完成之后清除操作  
    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

        // 接触数据源连接绑定
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.unbindResource(this.dataSource);
        }

        // 重置连接
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
            if (txObject.isMustRestoreAutoCommit()) {
                con.setAutoCommit(true);
            }
            DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
        }
        catch (Throwable ex) {
            logger.debug("Could not reset JDBC Connection after transaction", ex);
        }

        if (txObject.isNewConnectionHolder()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
            }
            //释放连接
            DataSourceUtils.releaseConnection(con, this.dataSource);
        }
        
        txObject.getConnectionHolder().clear();
    }
    原文作者:Spring Cloud
    原文地址: https://blog.csdn.net/bubaxiu/article/details/42395749
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞