介绍
先大概说一下Spring事务实现原理:我们知道事务的ACID属性和其涉及到数据一致性,Spring这边是通过AOP的方式将事务处理和业务代码分离出来。其中通过TransactionInterceptor来实现对代理方法的拦截,将事务处理的功能编织起来。由于底层不同的数据库导致不同的事务支持,Spring也对数据源做了适配。
TransactionProxyFactoryBean源码分析
我们先从TransactionProxyFactoryBean这个类入手,通过这个类你会知道Spring是如何通过AOP来完成事务管理的
public class TransactionProxyFactoryBean extends AbstractSingletonProxyFactoryBean implements BeanFactoryAware {
//这个拦截器就是发挥来AOP作用,其中封装了对事务的操作
private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor();
private Pointcut pointcut;
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionInterceptor.setTransactionManager(transactionManager);
}
//通过依赖注入将配置事务属性
public void setTransactionAttributes(Properties transactionAttributes) {
this.transactionInterceptor.setTransactionAttributes(transactionAttributes);
}
public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
this.transactionInterceptor.setTransactionAttributeSource(transactionAttributeSource);
}
public void setPointcut(Pointcut pointcut) {
this.pointcut = pointcut;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.transactionInterceptor.setBeanFactory(beanFactory);
}
//创建AOP的通知器
@Override
protected Object createMainInterceptor() {
this.transactionInterceptor.afterPropertiesSet();事务处理完成AOP配置
if (this.pointcut != null) {
return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor);
}
else {
// Rely on default pointcut.
return new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
}
}
@Override
protected void postProcessProxyFactory(ProxyFactory proxyFactory) {
proxyFactory.addInterface(TransactionalProxy.class);
}
}
TransactionInterceptor 源码分析
经过TransactionProxyFactoryBean 的AOP包装,不会直接作用其设置的目标对象,而是会被设置的事务处理拦截器TransactionInterceptor 拦截,来完成事务的创建、提交、回滚等底层的操作,接下来分析TransactionInterceptor 源码,看看事务处理拦截器是如何是设计和实现的
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
public Object invoke(final MethodInvocation invocation) throws Throwable {
//得到代理的目标对象,将事务属性传递给目标对象
Class targetClass = invocation.getThis() != null?AopUtils.getTargetClass(invocation.getThis()):null;
// If the transaction attribute is null, the method is non-transactional.
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass);
//这里要区分不同的PlatformTransactionManager,因为它们的调用方式不同
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务、同时将创建事务过程得到的信息赋给TransactionInfo,让TransactionInfo保存事务的状态
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// 使用调用沿着拦截器链进行,使最后目标对象的方法得到调用
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 如果整个事务过程出现异常,根据具体情况来决定回滚还是提交
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//这里把TransactionInfo设置oldTransactionInfo,表示该事务已经处理完
cleanupTransactionInfo(txInfo);
}
//通过事务处理器来对事务进行提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: 会导致回滚
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
} // 正常返回,事务提交
else {
// A normal return value: will lead to a commit.
return new ThrowableHolder(ex);
}
}
finally {
cleanupTransactionInfo(txInfo);
}
}
});
// Check result: It might indicate a Throwable to rethrow.
if (result instanceof ThrowableHolder) {
throw ((ThrowableHolder) result).getThrowable();
}
else {
return result;
}
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
}
}
}
提交源码分析
在invoke()方法中调用commitTransactionAfterReturning(txInfo)进行事务提交,在该方法中使用事务处理器进行提交,再不断进入提交的入口,我们可以看到processCommit()方法,
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
//事务提交的准备工作由事务处理器来完成
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
//如果当前事务是新事务,调用事务处理器进行提交,如果不是新事务,则不提交,由已经存在事务来完成提交
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}//具体的事务提交由具体的事务处理器来完成
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, err);
throw err;
}
// 触发回滚
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
回滚源码分析
在invoke()方法中调用txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus())进行事务回滚,再不断进入回滚方法的入口,我们可以看到processRollback()方法
private void processRollback(DefaultTransactionStatus status) {
try {
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
}
catch (RuntimeException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
cleanupAfterCompletion(status);
}
}
invoke方法里面做什么了?
1、读取事务属性配置。
2、根据配置决定用哪个事务处理器。
3、创建事务。
4、用TransactionInfo 保存事务状态。
5、对拦截器链进行处理。
6、更新TransactionInfo 信息。
7、事务提交。
在invoke方法中,看到整个事务处理和AOP拦截器中实现的全过程,这个方法通过Spring AOP框架实现了具体的事务处理,可以看出一个Spring AOP应用
总结
1、一系列的AOP配置
2、对事务方法的调用
3、启动拦截器的invoke方法,完成事务处理