spring batch 源码分析

一、启动batch任务时,会调用job.execute(jobExecution): job为FlowJob类型,jobExecution调用jobRepository的createJobExecution方法生成。
FlowJob继承org.springframework.batch.core.job.AbstractJob,调用AbstractJob#execute方法执行job,这个方法负责执行job、处理所有的listeners和repository调用、将实际的处理委托给子类的doExecute方法。

1. 为当前线程注册step context

JobSynchronizationManager.register(execution);

在org.springframework.batch.core.scope.context.JobSynchronizationManager#register方法中,实际调用JobSynchronizationManager的manager属性的register方法来完成。

JobSynchronizationManager的manager属性,默认为

new SynchronizationManagerSupport<JobExecution, JobContext>()

SynchronizationManagerSupport用来存储当前线程的execution,execution与context(new JobContext(execution))的map。

2. 校验jobParameters

jobParametersValidator.validate(execution.getJobParameters());

jobParameters不能为null,如果为null会抛出JobParametersInvalidException

3. 自行listeners的beforeJob方法

listener.beforeJob(execution);

4. 调用doExecute方法执行job

doExecute(execution);

二、org.springframework.batch.core.job.flow.FlowJob#doExecute方法执行job

1. 创建JobFlowExecutor,JobFlowExecutor用在需要执行与JobExecution有关的flow的组件中

JobFlowExecutor executor = new JobFlowExecutor(getJobRepository(),
      new SimpleStepHandler(getJobRepository()), execution);

SimpleStepHandler负责管理repository,重启业务,在new SimpleStepHandler时,会新建ExecutionContext。

new ExecutionContext()

这个类封装了一个ConcurrentHashMap,能够提供类型安全的read操作。

2.实际负责job的是flow.start(executor),flow是根据配置文件中定义的job生成的SimpleFlow

executor.updateJobExecutionStatus(flow.start(executor).getStatus());

SimpleFlow的属性startState为StepState类型,值为job的第一个step定义,name为job的id.第一个step的id。

start方法中,会定义一个state并且将startState赋值给他,然后取得stateName(job的id.第一个step的id),最后调用resume方法

if (startState == null) {
   initializeTransitions();
}
State state = startState;
String stateName = state.getName();
return resume(stateName, executor);

三、org.springframework.batch.core.job.flow.support.SimpleFlow#resume方法

resume方法的核心是调用state 的handle方法

while (isFlowContinued(state, status, stepExecution)) {
   stateName = state.getName();
   try {
      status = state.handle(executor);
      stepExecution = executor.getStepExecution();
   }
   state = nextState(stateName, status, stepExecution);
}
FlowExecution result = new FlowExecution(stateName, status);

四、org.springframework.batch.core.job.flow.support.state.StepState#handle方法只有两行语句

executor.abandonStepExecution();
return new FlowExecutionStatus(executor.executeStep(step));

在启动新的step时,要更新上一次execution,确保他执行失败后在这次启动时能够被放弃。

executor.executeStep(step)

org.springframework.batch.core.job.flow.JobFlowExecutor继承FlowExecutor接口,这个接口为FlowJob提供step by step执行的context和执行策略。

org.springframework.batch.core.job.flow.JobFlowExecutor#executeStep方法的核心

StepExecution stepExecution = stepHandler.handleStep(step, execution);

org.springframework.batch.core.job.SimpleStepHandler#handleStep

currentStepExecution = execution.createStepExecution(step.getName());
currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
step.execute(currentStepExecution);
currentStepExecution.getExecutionContext().put("batch.executed", true);

org.springframework.batch.core.step.AbstractStep#execute

doExecute(stepExecution);

org.springframework.batch.core.step.tasklet.TaskletStep#doExecute

这个方法中会创建一个Semaphore,这个信号量是为了step能够在不使用锁的情况下并发执行

stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {

   @Override
   public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
         throws Exception {

      StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

      // Before starting a new transaction, check for
      // interruption.
      interruptionPolicy.checkInterrupted(stepExecution);

      RepeatStatus result;
      try {
         result = new TransactionTemplate(transactionManager, transactionAttribute)
         .execute(new ChunkTransactionCallback(chunkContext, semaphore));
      }
      catch (UncheckedTransactionException e) {
         // Allow checked exceptions to be thrown inside callback
         throw (Exception) e.getCause();
      }

      chunkListener.afterChunk(chunkContext);

      // Check for interruption after transaction as well, so that
      // the interrupted exception is correctly propagated up to
      // caller
      interruptionPolicy.checkInterrupted(stepExecution);

      return result;
   }

});

org.springframework.batch.repeat.support.RepeatTemplate#iterate,执行batch callback直到completion policy任务已经完成。会等待整个batch完成再返回。

result = executeInternal(callback);

org.springframework.batch.repeat.support.RepeatTemplate#executeInternal,循环执行interceptor和batch callback的内部方法

RepeatInternalState state = createInternalState(context);

while (running) {

    if (running) 

        result = getNextResult(context
, callback
, state)
;        result = result.and(waitForResults(state))
;

executeAfterInterceptors(context
, result)
;执行createInternalState,会创建RepeatInternalStateSupport对象,这个对象的results属性为ResultHolderResultQueue类型

new RepeatInternalStateSupport();

this.results = new ResultHolderResultQueue(throttleLimit);

ResultHolderResultQueue类型的对象会根据job配置的throttleLimit来创建队列和信号量,用来多线程执行step,throttleLimit为期望得到的result的数量

results = new PriorityBlockingQueue<ResultHolder>(throttleLimit, new ResultHolderComparator());
waits = new Semaphore(throttleLimit);

五、org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate#getNextResult

这个方法利用TaskExecutor多线程执行,得到result;内部state是一个未执行完的result holders的队列;这个方法退出时,有返回值的holder不应该在队列中;队列被scoped在调用他的方法中,不需要synchronize access.

在创建ExecutingRunnable时,使用的是同一个context,即将多个线程的结果放入相同的context中。

ExecutingRunnable runnable = null;
ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
do {
   /*
    * Wrap the callback in a runnable that will add its result to the
    * queue when it is ready.
    */
   runnable = new ExecutingRunnable(callback, context, queue);
   /**  * Tell the runnable that it can expect a result. This could have * been in-lined with the constructor, but it might block, so it's * better to do it here, since we have the option (it's a private * class).  */  runnable.expect();
   taskExecutor.execute(runnable);
   /*
    * Allow termination policy to update its state. This must happen
    * immediately before or after the call to the task executor.
    */
   update(context);
   /*
    * Keep going until we get a result that is finished, or early
    * termination...
    */
} while (queue.isEmpty() && !isComplete(context));
/*
 * N.B. If the queue is empty then take() blocks until a result appears,
 * and there must be at least one because we just submitted one to the
 * task executor.
 */
ResultHolder result = queue.take();
return result.getResult();

在ExecutingRunnable的run方法中,会调用TaskletStep#doExecute方法在调用stepOperations.iterate时传入的callback

result = callback.doInIteration(context);

之前传入的callback对象的doInIteration方法中,执行获取结果的逻辑

result = new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));

execute方法最终会调用

result = action.doInTransaction(status);

ChunkTranscationCallback是TaskletStep的内部类,

org.springframework.batch.core.step.tasklet.TaskletStep.ChunkTransactionCallback#doInTransaction的核心:

result = tasklet.execute(contribution, chunkContext);

org.springframework.batch.core.step.item.ChunkOrientedTasklet#execute

execute方法中的chunkProvider类型根据配置文件中是否为chunk 配置skip-policy而不同,

如果未配置就是org.springframework.batch.core.step.item.SimpleChunkProvider;

如果配置了就是org.springframework.batch.core.step.item.FaultTolerantChunkProcessor,他是SimpleChunkProvider的子类,因此在执行provide、process、postProcess时,实际还是调用的SimpleChunkProvider中定义的方法

Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
if (inputs == null) {
   inputs = chunkProvider.provide(contribution);
   if (buffering) {
      chunkContext.setAttribute(INPUTS_KEY, inputs);
   }
}
chunkProcessor.process(contribution, inputs);
chunkProvider.postProcess(contribution, inputs);

// Allow a message coming back from the processor to say that we
// are not done yet
if (inputs.isBusy()) {
   logger.debug("Inputs still busy");
   return RepeatStatus.CONTINUABLE;
}

chunkContext.removeAttribute(INPUTS_KEY);
chunkContext.setComplete();
return RepeatStatus.continueIf(!inputs.isEnd());

1. org.springframework.batch.core.step.item.SimpleChunkProvider#provide

final Chunk<I> inputs = new Chunk<I>();
repeatOperations.iterate(new RepeatCallback() {

   @Override
   public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
      I item = null;
      try {
         item = read(contribution, inputs);
      }
      catch (SkipOverflowException e) {
         // read() tells us about an excess of skips by throwing an
         // exception
         return RepeatStatus.FINISHED;
      }
      if (item == null) {
         inputs.setEnd();
         return RepeatStatus.FINISHED;
      }
      inputs.add(item);
      contribution.incrementReadCount();
      return RepeatStatus.CONTINUABLE;
   }

});

return inputs;

repeatOperations跟上面TaskletStep#doExecute中的stepOperations一样,都是org.springframework.batch.repeat.support.RepeatTemplate类型,只是此时传入的callback的doInIteration方法实现不同

2. org.springframework.batch.core.step.item.SimpleChunkProcessor#process

// Allow temporary state to be stored in the user data field
initializeUserData(inputs);
// If there is no input we don't have to do anything more
if (isComplete(inputs)) {
   return;
}
// Make the transformation, calling remove() on the inputs iterator if
// any items are filtered. Might throw exception and cause rollback.
Chunk<O> outputs = transform(contribution, inputs);
// Adjust the filter count based on available data
contribution.incrementFilterCount(getFilterCount(inputs, outputs));
// Adjust the outputs if necessary for housekeeping purposes, and then
// write them out...
write(contribution, inputs, getAdjustedOutputs(inputs, outputs));

org.springframework.batch.core.step.item.FaultTolerantChunkProcessor#transform,会在这个方法中定义的RetryCallback对象的doWithRetry方法中调用配置文件中配置的processor

Chunk<O> outputs = new Chunk<O>();
@SuppressWarnings("unchecked")
final UserData<O> data = (UserData<O>) inputs.getUserData();
final Chunk<O> cache = data.getOutputs();
final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator();
final AtomicInteger count = new AtomicInteger(0);

// final int scanLimit = processorTransactional && data.scanning() ? 1 :
// 0;

for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {

   final I item = iterator.next();

   RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() {

      @Override
      public O doWithRetry(RetryContext context) throws Exception {
         O output = null;
         try {
            count.incrementAndGet();
            O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null;
            if (cached != null && !processorTransactional) {
               output = cached;
            }
            else {
               output = doProcess(item);
               if (output == null) {
                  data.incrementFilterCount();
               } else if (!processorTransactional && !data.scanning()) {
                  cache.add(output);
               }
            }
         }
         catch (Exception e) {
            if (rollbackClassifier.classify(e)) {
               // Default is to rollback unless the classifier
               // allows us to continue
               throw e;
            }
            else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
               // If we are not re-throwing then we should check if
               // this is skippable
               contribution.incrementProcessSkipCount();
               logger.debug("Skipping after failed process with no rollback", e);
               // If not re-throwing then the listener will not be
               // called in next chunk.
               callProcessSkipListener(item, e);
            }
            else {
               // If it's not skippable that's an error in
               // configuration - it doesn't make sense to not roll
               // back if we are also not allowed to skip
               throw new NonSkippableProcessException(
                     "Non-skippable exception in processor.  Make sure any exceptions that do not cause a rollback are skippable.",
                     e);
            }
         }
         if (output == null) {
            // No need to re-process filtered items
            iterator.remove();
         }
         return output;
      }

   };

   RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {

      @Override
      public O recover(RetryContext context) throws Exception {
         Throwable e = context.getLastThrowable();
         if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
            iterator.remove(e);
            contribution.incrementProcessSkipCount();
            logger.debug("Skipping after failed process", e);
            return null;
         }
         else {
            if (rollbackClassifier.classify(e)) {
               // Default is to rollback unless the classifier
               // allows us to continue
               throw new RetryException("Non-skippable exception in recoverer while processing", e);
            }
            iterator.remove(e);
            return null;
         }
      }

   };

   O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
         getInputKey(item), rollbackClassifier));
   if (output != null) {
      outputs.add(output);
   }

   /*
    * We only want to process the first item if there is a scan for a
    * failed item.
    */
   if (data.scanning()) {
      while (cacheIterator != null && cacheIterator.hasNext()) {
         outputs.add(cacheIterator.next());
      }
      // Only process the first item if scanning
      break;
   }
}

return outputs;

org.springframework.batch.core.step.item.BatchRetryTemplate#execute(org.springframework.retry.RetryCallback<T,E>, org.springframework.retry.RecoveryCallback<T>, org.springframework.retry.RetryState)

O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
         getInputKey(item), rollbackClassifier));

最终调用org.springframework.retry.support.RetryTemplate#doExecute,核心代码如下:

RetryContext context = this.open(retryPolicy, state);
RetrySynchronizationManager.register(context);
try {
    boolean running = this.doOpenInterceptors(retryCallback, context);
    while(this.canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
        try {
            lastException = null;
            var24 = retryCallback.doWithRetry(context);
            return var24;
        } catch (Throwable var21) {
        }
    }
    var24 = this.handleRetryExhausted(recoveryCallback, context, state);
} catch (Throwable var22) {
    throw wrapIfNecessary(var22);
} finally {
    this.close(retryPolicy, context, state, lastException == null);
    this.doCloseInterceptors(retryCallback, context, lastException);
    RetrySynchronizationManager.clear();
}
return var24;

3. org.springframework.batch.core.step.item.FaultTolerantChunkProcessor#write

final UserData<O> data = (UserData<O>) inputs.getUserData();
final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>();
RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
   @Override
   public Object doWithRetry(RetryContext context) throws Exception {
      contextHolder.set(context);

      if (!data.scanning()) {
         chunkMonitor.setChunkSize(inputs.size());
         try {
            doWrite(outputs.getItems());
         }
         catch (Exception e) {
            if (rollbackClassifier.classify(e)) {
               throw e;
            }
            /*
             * If the exception is marked as no-rollback, we need to
             * override that, otherwise there's no way to write the
             * rest of the chunk or to honour the skip listener
             * contract.
             */
            throw new ForceRollbackForWriteSkipException(
                  "Force rollback on skippable exception so that skipped item can be located.", e);
         }
         contribution.incrementWriteCount(outputs.size());
      }
      else {
         scan(contribution, inputs, outputs, chunkMonitor, false);
      }
      return null;

   }
};

if (!buffering) {

   RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {

      @Override
      public Object recover(RetryContext context) throws Exception {

         Throwable e = context.getLastThrowable();
         if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {
            throw new RetryException("Invalid retry state during write caused by "
                  + "exception that does not classify for rollback: ", e);
         }

         Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
         for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {

            inputIterator.next();
            outputIterator.next();

            checkSkipPolicy(inputIterator, outputIterator, e, contribution, true);
            if (!rollbackClassifier.classify(e)) {
               throw new RetryException(
                     "Invalid retry state during recovery caused by exception that does not classify for rollback: ",
                     e);
            }

         }

         return null;

      }

   };

   batchRetryTemplate.execute(retryCallback, batchRecoveryCallback,
         BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier));

}
else {

   RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {

      @Override
      public Object recover(RetryContext context) throws Exception {
         /*
          * If the last exception was not skippable we don't need to
          * do any scanning. We can just bomb out with a retry
          * exhausted.
          */
         if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) {
            throw new ExhaustedRetryException(
                  "Retry exhausted after last attempt in recovery path, but exception is not skippable.",
                  context.getLastThrowable());
         }

         inputs.setBusy(true);
         data.scanning(true);
         scan(contribution, inputs, outputs, chunkMonitor, true);
         return null;
      }

   };

   try {
      batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
            rollbackClassifier));
   }
   catch (Exception e) {
      RetryContext context = contextHolder.get();
      if (!batchRetryTemplate.canRetry(context)) {
         /*
          * BATCH-1761: we need advance warning of the scan about to
          * start in the next transaction, so we can change the
          * processing behaviour.
          */
         data.scanning(true);
      }
      throw e;
   }

}

callSkipListeners(inputs, outputs);


疑问:

1. 每个step的chunk在执行时,reader、processor、writer是否在同一个线程中执行?

2. 如果reader、processor、writer可以在不同线程中执行,源码中在何处调度?并未发现相关代码.

    原文作者:Spring Cloud
    原文地址: https://blog.csdn.net/ray1205/article/details/79546099
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞