一、启动batch任务时,会调用job.execute(jobExecution): job为FlowJob类型,jobExecution调用jobRepository的createJobExecution方法生成。
FlowJob继承org.springframework.batch.core.job.AbstractJob,调用AbstractJob#execute方法执行job,这个方法负责执行job、处理所有的listeners和repository调用、将实际的处理委托给子类的doExecute方法。
1. 为当前线程注册step context
JobSynchronizationManager.register(execution);
在org.springframework.batch.core.scope.context.JobSynchronizationManager#register方法中,实际调用JobSynchronizationManager的manager属性的register方法来完成。
JobSynchronizationManager的manager属性,默认为
new SynchronizationManagerSupport<JobExecution, JobContext>()
SynchronizationManagerSupport用来存储当前线程的execution,execution与context(new JobContext(execution))的map。
2. 校验jobParameters
jobParametersValidator.validate(execution.getJobParameters());
jobParameters不能为null,如果为null会抛出JobParametersInvalidException
3. 自行listeners的beforeJob方法
listener.beforeJob(execution);
4. 调用doExecute方法执行job
doExecute(execution);
二、org.springframework.batch.core.job.flow.FlowJob#doExecute方法执行job
1. 创建JobFlowExecutor,JobFlowExecutor用在需要执行与JobExecution有关的flow的组件中
JobFlowExecutor executor = new JobFlowExecutor(getJobRepository(), new SimpleStepHandler(getJobRepository()), execution);
SimpleStepHandler负责管理repository,重启业务,在new SimpleStepHandler时,会新建ExecutionContext。
new ExecutionContext()
这个类封装了一个ConcurrentHashMap,能够提供类型安全的read操作。
2.实际负责job的是flow.start(executor),flow是根据配置文件中定义的job生成的SimpleFlow
executor.updateJobExecutionStatus(flow.start(executor).getStatus());
SimpleFlow的属性startState为StepState类型,值为job的第一个step定义,name为job的id.第一个step的id。
start方法中,会定义一个state并且将startState赋值给他,然后取得stateName(job的id.第一个step的id),最后调用resume方法
if (startState == null) { initializeTransitions(); } State state = startState; String stateName = state.getName(); return resume(stateName, executor);
三、org.springframework.batch.core.job.flow.support.SimpleFlow#resume方法
resume方法的核心是调用state 的handle方法
while (isFlowContinued(state, status, stepExecution)) { stateName = state.getName(); try { status = state.handle(executor); stepExecution = executor.getStepExecution(); } state = nextState(stateName, status, stepExecution); } FlowExecution result = new FlowExecution(stateName, status);
四、org.springframework.batch.core.job.flow.support.state.StepState#handle方法只有两行语句
executor.abandonStepExecution(); return new FlowExecutionStatus(executor.executeStep(step));
在启动新的step时,要更新上一次execution,确保他执行失败后在这次启动时能够被放弃。
executor.executeStep(step)
org.springframework.batch.core.job.flow.JobFlowExecutor继承FlowExecutor接口,这个接口为FlowJob提供step by step执行的context和执行策略。
org.springframework.batch.core.job.flow.JobFlowExecutor#executeStep方法的核心
StepExecution stepExecution = stepHandler.handleStep(step, execution);
org.springframework.batch.core.job.SimpleStepHandler#handleStep
currentStepExecution = execution.createStepExecution(step.getName());
currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
step.execute(currentStepExecution);
currentStepExecution.getExecutionContext().put("batch.executed", true);
org.springframework.batch.core.step.AbstractStep#execute
doExecute(stepExecution);
org.springframework.batch.core.step.tasklet.TaskletStep#doExecute
这个方法中会创建一个Semaphore,这个信号量是为了step能够在不使用锁的情况下并发执行
stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { @Override public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) throws Exception { StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); // Before starting a new transaction, check for // interruption. interruptionPolicy.checkInterrupted(stepExecution); RepeatStatus result; try { result = new TransactionTemplate(transactionManager, transactionAttribute) .execute(new ChunkTransactionCallback(chunkContext, semaphore)); } catch (UncheckedTransactionException e) { // Allow checked exceptions to be thrown inside callback throw (Exception) e.getCause(); } chunkListener.afterChunk(chunkContext); // Check for interruption after transaction as well, so that // the interrupted exception is correctly propagated up to // caller interruptionPolicy.checkInterrupted(stepExecution); return result; } });
org.springframework.batch.repeat.support.RepeatTemplate#iterate,执行batch callback直到completion policy任务已经完成。会等待整个batch完成再返回。
result = executeInternal(callback);
org.springframework.batch.repeat.support.RepeatTemplate#executeInternal,循环执行interceptor和batch callback的内部方法
RepeatInternalState state = createInternalState(context);
while (running) {
if (running)
result = getNextResult(context
, callback
, state)
; result = result.and(waitForResults(state))
;
executeAfterInterceptors(context
, result)
;执行createInternalState,会创建RepeatInternalStateSupport对象,这个对象的results属性为ResultHolderResultQueue类型
new RepeatInternalStateSupport();
this.results = new ResultHolderResultQueue(throttleLimit);
ResultHolderResultQueue类型的对象会根据job配置的throttleLimit来创建队列和信号量,用来多线程执行step,throttleLimit为期望得到的result的数量
results = new PriorityBlockingQueue<ResultHolder>(throttleLimit, new ResultHolderComparator()); waits = new Semaphore(throttleLimit);
五、org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate#getNextResult
这个方法利用TaskExecutor多线程执行,得到result;内部state是一个未执行完的result holders的队列;这个方法退出时,有返回值的holder不应该在队列中;队列被scoped在调用他的方法中,不需要synchronize access.
在创建ExecutingRunnable时,使用的是同一个context,即将多个线程的结果放入相同的context中。
ExecutingRunnable runnable = null; ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue(); do { /* * Wrap the callback in a runnable that will add its result to the * queue when it is ready. */ runnable = new ExecutingRunnable(callback, context, queue); /** * Tell the runnable that it can expect a result. This could have * been in-lined with the constructor, but it might block, so it's * better to do it here, since we have the option (it's a private * class). */ runnable.expect(); taskExecutor.execute(runnable); /* * Allow termination policy to update its state. This must happen * immediately before or after the call to the task executor. */ update(context); /* * Keep going until we get a result that is finished, or early * termination... */ } while (queue.isEmpty() && !isComplete(context)); /* * N.B. If the queue is empty then take() blocks until a result appears, * and there must be at least one because we just submitted one to the * task executor. */ ResultHolder result = queue.take(); return result.getResult();
在ExecutingRunnable的run方法中,会调用TaskletStep#doExecute方法在调用stepOperations.iterate时传入的callback
result = callback.doInIteration(context);
之前传入的callback对象的doInIteration方法中,执行获取结果的逻辑
result = new TransactionTemplate(transactionManager, transactionAttribute) .execute(new ChunkTransactionCallback(chunkContext, semaphore));
execute方法最终会调用
result = action.doInTransaction(status);
ChunkTranscationCallback是TaskletStep的内部类,
org.springframework.batch.core.step.tasklet.TaskletStep.ChunkTransactionCallback#doInTransaction的核心:
result = tasklet.execute(contribution, chunkContext);
org.springframework.batch.core.step.item.ChunkOrientedTasklet#execute
execute方法中的chunkProvider类型根据配置文件中是否为chunk 配置skip-policy而不同,
如果未配置就是org.springframework.batch.core.step.item.SimpleChunkProvider;
如果配置了就是org.springframework.batch.core.step.item.FaultTolerantChunkProcessor,他是SimpleChunkProvider的子类,因此在执行provide、process、postProcess时,实际还是调用的SimpleChunkProvider中定义的方法
Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY); if (inputs == null) { inputs = chunkProvider.provide(contribution); if (buffering) { chunkContext.setAttribute(INPUTS_KEY, inputs); } } chunkProcessor.process(contribution, inputs); chunkProvider.postProcess(contribution, inputs); // Allow a message coming back from the processor to say that we // are not done yet if (inputs.isBusy()) { logger.debug("Inputs still busy"); return RepeatStatus.CONTINUABLE; } chunkContext.removeAttribute(INPUTS_KEY); chunkContext.setComplete(); return RepeatStatus.continueIf(!inputs.isEnd());
1. org.springframework.batch.core.step.item.SimpleChunkProvider#provide
final Chunk<I> inputs = new Chunk<I>(); repeatOperations.iterate(new RepeatCallback() { @Override public RepeatStatus doInIteration(final RepeatContext context) throws Exception { I item = null; try { item = read(contribution, inputs); } catch (SkipOverflowException e) { // read() tells us about an excess of skips by throwing an // exception return RepeatStatus.FINISHED; } if (item == null) { inputs.setEnd(); return RepeatStatus.FINISHED; } inputs.add(item); contribution.incrementReadCount(); return RepeatStatus.CONTINUABLE; } }); return inputs;
repeatOperations跟上面TaskletStep#doExecute中的stepOperations一样,都是org.springframework.batch.repeat.support.RepeatTemplate类型,只是此时传入的callback的doInIteration方法实现不同
2. org.springframework.batch.core.step.item.SimpleChunkProcessor#process
// Allow temporary state to be stored in the user data field initializeUserData(inputs); // If there is no input we don't have to do anything more if (isComplete(inputs)) { return; } // Make the transformation, calling remove() on the inputs iterator if // any items are filtered. Might throw exception and cause rollback. Chunk<O> outputs = transform(contribution, inputs); // Adjust the filter count based on available data contribution.incrementFilterCount(getFilterCount(inputs, outputs)); // Adjust the outputs if necessary for housekeeping purposes, and then // write them out... write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
org.springframework.batch.core.step.item.FaultTolerantChunkProcessor#transform,会在这个方法中定义的RetryCallback对象的doWithRetry方法中调用配置文件中配置的processor
Chunk<O> outputs = new Chunk<O>(); @SuppressWarnings("unchecked") final UserData<O> data = (UserData<O>) inputs.getUserData(); final Chunk<O> cache = data.getOutputs(); final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator(); final AtomicInteger count = new AtomicInteger(0); // final int scanLimit = processorTransactional && data.scanning() ? 1 : // 0; for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { final I item = iterator.next(); RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() { @Override public O doWithRetry(RetryContext context) throws Exception { O output = null; try { count.incrementAndGet(); O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null; if (cached != null && !processorTransactional) { output = cached; } else { output = doProcess(item); if (output == null) { data.incrementFilterCount(); } else if (!processorTransactional && !data.scanning()) { cache.add(output); } } } catch (Exception e) { if (rollbackClassifier.classify(e)) { // Default is to rollback unless the classifier // allows us to continue throw e; } else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) { // If we are not re-throwing then we should check if // this is skippable contribution.incrementProcessSkipCount(); logger.debug("Skipping after failed process with no rollback", e); // If not re-throwing then the listener will not be // called in next chunk. callProcessSkipListener(item, e); } else { // If it's not skippable that's an error in // configuration - it doesn't make sense to not roll // back if we are also not allowed to skip throw new NonSkippableProcessException( "Non-skippable exception in processor. Make sure any exceptions that do not cause a rollback are skippable.", e); } } if (output == null) { // No need to re-process filtered items iterator.remove(); } return output; } }; RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() { @Override public O recover(RetryContext context) throws Exception { Throwable e = context.getLastThrowable(); if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) { iterator.remove(e); contribution.incrementProcessSkipCount(); logger.debug("Skipping after failed process", e); return null; } else { if (rollbackClassifier.classify(e)) { // Default is to rollback unless the classifier // allows us to continue throw new RetryException("Non-skippable exception in recoverer while processing", e); } iterator.remove(e); return null; } } }; O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState( getInputKey(item), rollbackClassifier)); if (output != null) { outputs.add(output); } /* * We only want to process the first item if there is a scan for a * failed item. */ if (data.scanning()) { while (cacheIterator != null && cacheIterator.hasNext()) { outputs.add(cacheIterator.next()); } // Only process the first item if scanning break; } } return outputs;
org.springframework.batch.core.step.item.BatchRetryTemplate#execute(org.springframework.retry.RetryCallback<T,E>, org.springframework.retry.RecoveryCallback<T>, org.springframework.retry.RetryState)
O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState( getInputKey(item), rollbackClassifier));
最终调用org.springframework.retry.support.RetryTemplate#doExecute,核心代码如下:
RetryContext context = this.open(retryPolicy, state); RetrySynchronizationManager.register(context); try { boolean running = this.doOpenInterceptors(retryCallback, context); while(this.canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try { lastException = null; var24 = retryCallback.doWithRetry(context); return var24; } catch (Throwable var21) { } } var24 = this.handleRetryExhausted(recoveryCallback, context, state); } catch (Throwable var22) { throw wrapIfNecessary(var22); } finally { this.close(retryPolicy, context, state, lastException == null); this.doCloseInterceptors(retryCallback, context, lastException); RetrySynchronizationManager.clear(); } return var24;
3. org.springframework.batch.core.step.item.FaultTolerantChunkProcessor#write
final UserData<O> data = (UserData<O>) inputs.getUserData(); final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>(); RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() { @Override public Object doWithRetry(RetryContext context) throws Exception { contextHolder.set(context); if (!data.scanning()) { chunkMonitor.setChunkSize(inputs.size()); try { doWrite(outputs.getItems()); } catch (Exception e) { if (rollbackClassifier.classify(e)) { throw e; } /* * If the exception is marked as no-rollback, we need to * override that, otherwise there's no way to write the * rest of the chunk or to honour the skip listener * contract. */ throw new ForceRollbackForWriteSkipException( "Force rollback on skippable exception so that skipped item can be located.", e); } contribution.incrementWriteCount(outputs.size()); } else { scan(contribution, inputs, outputs, chunkMonitor, false); } return null; } }; if (!buffering) { RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() { @Override public Object recover(RetryContext context) throws Exception { Throwable e = context.getLastThrowable(); if (outputs.size() > 1 && !rollbackClassifier.classify(e)) { throw new RetryException("Invalid retry state during write caused by " + "exception that does not classify for rollback: ", e); } Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) { inputIterator.next(); outputIterator.next(); checkSkipPolicy(inputIterator, outputIterator, e, contribution, true); if (!rollbackClassifier.classify(e)) { throw new RetryException( "Invalid retry state during recovery caused by exception that does not classify for rollback: ", e); } } return null; } }; batchRetryTemplate.execute(retryCallback, batchRecoveryCallback, BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier)); } else { RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() { @Override public Object recover(RetryContext context) throws Exception { /* * If the last exception was not skippable we don't need to * do any scanning. We can just bomb out with a retry * exhausted. */ if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) { throw new ExhaustedRetryException( "Retry exhausted after last attempt in recovery path, but exception is not skippable.", context.getLastThrowable()); } inputs.setBusy(true); data.scanning(true); scan(contribution, inputs, outputs, chunkMonitor, true); return null; } }; try { batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs, rollbackClassifier)); } catch (Exception e) { RetryContext context = contextHolder.get(); if (!batchRetryTemplate.canRetry(context)) { /* * BATCH-1761: we need advance warning of the scan about to * start in the next transaction, so we can change the * processing behaviour. */ data.scanning(true); } throw e; } } callSkipListeners(inputs, outputs);
疑问:
1. 每个step的chunk在执行时,reader、processor、writer是否在同一个线程中执行?
2. 如果reader、processor、writer可以在不同线程中执行,源码中在何处调度?并未发现相关代码.