Spring事务源码分析

介绍

先大概说一下Spring事务实现原理:我们知道事务的ACID属性和其涉及到数据一致性,Spring这边是通过AOP的方式将事务处理和业务代码分离出来。其中通过TransactionInterceptor来实现对代理方法的拦截,将事务处理的功能编织起来。由于底层不同的数据库导致不同的事务支持,Spring也对数据源做了适配。

TransactionProxyFactoryBean源码分析

我们先从TransactionProxyFactoryBean这个类入手,通过这个类你会知道Spring是如何通过AOP来完成事务管理的

public class TransactionProxyFactoryBean extends AbstractSingletonProxyFactoryBean implements BeanFactoryAware {
    //这个拦截器就是发挥来AOP作用,其中封装了对事务的操作
    private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor();

    private Pointcut pointcut;

    public void setTransactionManager(PlatformTransactionManager transactionManager) {
        this.transactionInterceptor.setTransactionManager(transactionManager);
    }
    //通过依赖注入将配置事务属性
    public void setTransactionAttributes(Properties transactionAttributes) {
        this.transactionInterceptor.setTransactionAttributes(transactionAttributes);
    }
    public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
        this.transactionInterceptor.setTransactionAttributeSource(transactionAttributeSource);
    }

    public void setPointcut(Pointcut pointcut) {
        this.pointcut = pointcut;
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        this.transactionInterceptor.setBeanFactory(beanFactory);
    }
    //创建AOP的通知器
    @Override
    protected Object createMainInterceptor() {
        this.transactionInterceptor.afterPropertiesSet();事务处理完成AOP配置
        if (this.pointcut != null) {
            return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor);
        }
        else {
            // Rely on default pointcut.
            return new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
        }
    }

    @Override
    protected void postProcessProxyFactory(ProxyFactory proxyFactory) {
        proxyFactory.addInterface(TransactionalProxy.class);
    }

}

TransactionInterceptor 源码分析

经过TransactionProxyFactoryBean 的AOP包装,不会直接作用其设置的目标对象,而是会被设置的事务处理拦截器TransactionInterceptor 拦截,来完成事务的创建、提交、回滚等底层的操作,接下来分析TransactionInterceptor 源码,看看事务处理拦截器是如何是设计和实现的

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {

    public Object invoke(final MethodInvocation invocation) throws Throwable {
        //得到代理的目标对象,将事务属性传递给目标对象
        Class targetClass = invocation.getThis() != null?AopUtils.getTargetClass(invocation.getThis()):null;
        // If the transaction attribute is null, the method is non-transactional.
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        final String joinpointIdentification = methodIdentification(method, targetClass);

        //这里要区分不同的PlatformTransactionManager,因为它们的调用方式不同
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // 创建事务、同时将创建事务过程得到的信息赋给TransactionInfo,让TransactionInfo保存事务的状态
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // 使用调用沿着拦截器链进行,使最后目标对象的方法得到调用
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 如果整个事务过程出现异常,根据具体情况来决定回滚还是提交
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                //这里把TransactionInfo设置oldTransactionInfo,表示该事务已经处理完
                cleanupTransactionInfo(txInfo);
            }
            //通过事务处理器来对事务进行提交
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        else {
            // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
            try {
                Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
                        new TransactionCallback<Object>() {
                            @Override
                            public Object doInTransaction(TransactionStatus status) {
                                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                                try {
                                    return invocation.proceedWithInvocation();
                                }
                                catch (Throwable ex) {
                                    if (txAttr.rollbackOn(ex)) {
                                        // A RuntimeException: 会导致回滚
                                        if (ex instanceof RuntimeException) {
                                            throw (RuntimeException) ex;
                                        }
                                        else {
                                            throw new ThrowableHolderException(ex);
                                        }
                                    } // 正常返回,事务提交
                                    else {
                                        // A normal return value: will lead to a commit.
                                        return new ThrowableHolder(ex);
                                    }
                                }
                                finally {
                                    cleanupTransactionInfo(txInfo);
                                }
                            }
                        });

                // Check result: It might indicate a Throwable to rethrow.
                if (result instanceof ThrowableHolder) {
                    throw ((ThrowableHolder) result).getThrowable();
                }
                else {
                    return result;
                }
            }
            catch (ThrowableHolderException ex) {
                throw ex.getCause();
            }
        }
    }
}

提交源码分析

在invoke()方法中调用commitTransactionAfterReturning(txInfo)进行事务提交,在该方法中使用事务处理器进行提交,再不断进入提交的入口,我们可以看到processCommit()方法,

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
                //事务提交的准备工作由事务处理器来完成
                prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    status.releaseHeldSavepoint();
                }
                //如果当前事务是新事务,调用事务处理器进行提交,如果不是新事务,则不提交,由已经存在事务来完成提交
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }//具体的事务提交由具体的事务处理器来完成
                    doCommit(status);
                }
                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (globalRollbackOnly) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }
            catch (Error err) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, err);
                throw err;
            }

            // 触发回滚
            try {
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

回滚源码分析

在invoke()方法中调用txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus())进行事务回滚,再不断进入回滚方法的入口,我们可以看到processRollback()方法

private void processRollback(DefaultTransactionStatus status) {
        try {
            try {
                triggerBeforeCompletion(status);
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                    status.rollbackToHeldSavepoint();
                }
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
                    doRollback(status);
                }
                else if (status.hasTransaction()) {
                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                        }
                        doSetRollbackOnly(status);
                    }
                    else {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                        }
                    }
                }
                else {
                    logger.debug("Should roll back transaction but cannot - no transaction available");
                }
            }
            catch (RuntimeException ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }
            catch (Error err) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw err;
            }
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

invoke方法里面做什么了?

1、读取事务属性配置。
2、根据配置决定用哪个事务处理器。
3、创建事务。
4、用TransactionInfo 保存事务状态。
5、对拦截器链进行处理。
6、更新TransactionInfo 信息。
7、事务提交。

在invoke方法中,看到整个事务处理和AOP拦截器中实现的全过程,这个方法通过Spring AOP框架实现了具体的事务处理,可以看出一个Spring AOP应用

总结

1、一系列的AOP配置
2、对事务方法的调用
3、启动拦截器的invoke方法,完成事务处理

    原文作者:Spring MVC
    原文地址: https://blog.csdn.net/jyxmust/article/details/81260007
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞