Spring Batch의 동작 코드 #Step 생성과 실행

2020. 9. 4. 16:23 Spring Framework/Spring boot

이 글은 Job이 어떻게 Step List를 실행하는지 코드를 보며 확인해 볼 것이다.

SimpleJob.doExecute

@Override
protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException,
StartLimitExceededException {

    StepExecution stepExecution = null;
    for (Step step : steps) {
        stepExecution = handleStep(step, execution);
        if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
            //
            // Terminate the job if a step fails
            //
            break;
        }
    }

    //
    // Update the job status to be the same as the last step
    //
    if (stepExecution != null) {
        if (logger.isDebugEnabled()) {
            logger.debug("Upgrading JobExecution status: " + stepExecution);
        }
        execution.upgradeStatus(stepExecution.getStatus());
        execution.setExitStatus(stepExecution.getExitStatus());
    }
}

SimpleJob에 대해서는 앞글에서 간단히 설명했다. doExecute 메소드를 다시 설명하는 이유는 바로 이 메소드에서 Step List를 실행하기 때문이다. 정확히는 stepExecution = handleStep(step, execution); 이 코드가 Step을 실행한다.

AbstractJob.handleStep

protected final StepExecution handleStep(Step step, JobExecution execution)
        throws JobInterruptedException, JobRestartException,
        StartLimitExceededException {
    return stepHandler.handleStep(step, execution);

}

StepHandler.handleStep 메소드의 arguments로 Step JobExecution 객체를 넘긴다.

그런데 뭔가 익숙해 보이는 StepExecution 객체가 보인다. Job을 실행하는데 필요했던 JobExecution 객체와 이름이 비슷해보인다.

SimpleStepHanldler.handleStep

@Override
public StepExecution handleStep(Step step, JobExecution execution) throws JobInterruptedException,
JobRestartException, StartLimitExceededException {
    if (execution.isStopping()) {
        throw new JobInterruptedException("JobExecution interrupted.");
    }

    JobInstance jobInstance = execution.getJobInstance();

    StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
    if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) {
        // If the last execution of this step was in the same job, it's
        // probably intentional so we want to run it again...
        logger.info(String.format("Duplicate step [%s] detected in execution of job=[%s]. "
                + "If either step fails, both will be executed again on restart.", step.getName(), jobInstance
                .getJobName()));
        lastStepExecution = null;
    }
    StepExecution currentStepExecution = lastStepExecution;

    if (shouldStart(lastStepExecution, execution, step)) {

        currentStepExecution = execution.createStepExecution(step.getName());

        boolean isRestart = (lastStepExecution != null && !lastStepExecution.getStatus().equals(
                BatchStatus.COMPLETED));

        if (isRestart) {
            currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());

            if(lastStepExecution.getExecutionContext().containsKey("batch.executed")) {
                currentStepExecution.getExecutionContext().remove("batch.executed");
            }
        }
        else {
            currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
        }

        jobRepository.add(currentStepExecution);

        logger.info("Executing step: [" + step.getName() + "]");
        try {
            step.execute(currentStepExecution);
            currentStepExecution.getExecutionContext().put("batch.executed", true);
        }
        catch (JobInterruptedException e) {
            // Ensure that the job gets the message that it is stopping
            // and can pass it on to other steps that are executing
            // concurrently.
            execution.setStatus(BatchStatus.STOPPING);
            throw e;
        }

        jobRepository.updateExecutionContext(execution);

        if (currentStepExecution.getStatus() == BatchStatus.STOPPING
                || currentStepExecution.getStatus() == BatchStatus.STOPPED) {
            // Ensure that the job gets the message that it is stopping
            execution.setStatus(BatchStatus.STOPPING);
            throw new JobInterruptedException("Job interrupted by step execution");
        }

    }

    return currentStepExecution;
}

이 객체를 반환하는 handleStep 메소드는 

  • StepExecution 생성
  • Step 실행 : step.execute(currentStepExecution);

Job과 마찬가지로 Step을 실행하는데 필요한 StepExecution 객체를 만들고 Step을 실행한다. 그럼 Step의 실행되는 코드를 확인해보자.

StepBuilderFactory

StepBuilderFactory를 이용해 Step을 생성한 코드만 다시 보자.

@Autowired
private StepBuilderFactory stepBuilderFactory;
    
@Bean
public Step step() {
    return stepBuilderFactory.get("simple-step")
            .<String, StringWrapper>chunk(10)
            .reader(itemReader())
            .processor(itemProcess())
            .writer(itemWriter())
            .build();
}

StepBuilderFactory.get

public StepBuilder get(String name) {
    StepBuilder builder = new StepBuilder(name).repository(jobRepository).transactionManager(
            transactionManager);
    return builder;
}

예제에서 Step을 생성하기 위해 StepBuilderFactory를 사용했다. get 메소드를 보면 StepBuilder 객체를 생성해 반환한다.

StepBuilder.chunk

public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
    return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
}

chunk 메소드는 SimpleStepBuilder를 반환한다.

SimpleStepBuilder.build



public SimpleStepBuilder<I, O> reader(ItemReader<? extends I> reader) {
    this.reader = reader;
    return this;
}
    
public SimpleStepBuilder<I, O> writer(ItemWriter<? super O> writer) {
    this.writer = writer;
    return this;
}
    
public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) {
    this.processor = processor;
    return this;
}

SimpleStepBuilder reader, processor, writer 메소드는 어떤 객체를 반환하는지 확인하니 자기 자신(return this;)을 반환한다. build 메소드를 보자.

@Override
public TaskletStep build() {

    registerStepListenerAsItemListener();
    registerAsStreamsAndListeners(reader, processor, writer);
    return super.build();
}

TaskletStep 객체를 반환한다. 즉, SimpleStepHandler.handleStep에서 실행한 Step interface의 구현체는 TaskletStep이다. TaskletStep execute 메소드를 보자.



예제에서 만든 Step이 TaskletStep을 반환하는 것 뿐이지, Step interface를 구현한 여러 종류에 구현체 들이 존재한다. 이건 나중에 나중에 알아보도록...

TaskletStep.execute

예상했겠지만, TaskletStep은 AbstractStep을 상속받고 있다. 이 구조 또한 Job과 거의 비슷하다.

AbstractStep.execute

@Override
public final void execute(StepExecution stepExecution) throws JobInterruptedException,
UnexpectedJobExecutionException {

    if (logger.isDebugEnabled()) {
        logger.debug("Executing: id=" + stepExecution.getId());
    }
    stepExecution.setStartTime(new Date());
    stepExecution.setStatus(BatchStatus.STARTED);
    getJobRepository().update(stepExecution);

    // Start with a default value that will be trumped by anything
    ExitStatus exitStatus = ExitStatus.EXECUTING;

    doExecutionRegistration(stepExecution);

    try {
        getCompositeListener().beforeStep(stepExecution);
        open(stepExecution.getExecutionContext());

        try {
            doExecute(stepExecution);
        }
        catch (RepeatException e) {
            throw e.getCause();
        }
        exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());

        // Check if someone is trying to stop us
        if (stepExecution.isTerminateOnly()) {
            throw new JobInterruptedException("JobExecution interrupted.");
        }

        // Need to upgrade here not set, in case the execution was stopped
        stepExecution.upgradeStatus(BatchStatus.COMPLETED);
        if (logger.isDebugEnabled()) {
            logger.debug("Step execution success: id=" + stepExecution.getId());
        }
    }
    catch (Throwable e) {
        stepExecution.upgradeStatus(determineBatchStatus(e));
        exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e));
        stepExecution.addFailureException(e);
        if (stepExecution.getStatus() == BatchStatus.STOPPED) {
            logger.info(String.format("Encountered interruption executing step %s in job %s : %s", name, stepExecution.getJobExecution().getJobInstance().getJobName(), e.getMessage()));
            if (logger.isDebugEnabled()) {
                logger.debug("Full exception", e);
            }
        }
        else {
            logger.error(String.format("Encountered an error executing step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
        }
    }
    finally {

        try {
            // Update the step execution to the latest known value so the
            // listeners can act on it
            exitStatus = exitStatus.and(stepExecution.getExitStatus());
            stepExecution.setExitStatus(exitStatus);
            exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
        }
        catch (Exception e) {
            logger.error(String.format("Exception in afterStep callback in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
        }

        try {
            getJobRepository().updateExecutionContext(stepExecution);
        }
        catch (Exception e) {
            stepExecution.setStatus(BatchStatus.UNKNOWN);
            exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
            stepExecution.addFailureException(e);
            logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
                    + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
        }

        stepExecution.setEndTime(new Date());
        stepExecution.setExitStatus(exitStatus);

        try {
            getJobRepository().update(stepExecution);
        }
        catch (Exception e) {
            stepExecution.setStatus(BatchStatus.UNKNOWN);
            stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
            stepExecution.addFailureException(e);
            logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
                    + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
        }

        try {
            close(stepExecution.getExecutionContext());
        }
        catch (Exception e) {
            logger.error(String.format("Exception while closing step execution resources in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
            stepExecution.addFailureException(e);
        }

        doExecutionRelease();

        if (logger.isDebugEnabled()) {
            logger.debug("Step execution complete: " + stepExecution.getSummary());
        }
    }
}
    Step 또한 StepListener가 존재한다. 이를 실행하는 코드
    • 전처리 : getCompositeListener().beforeStep(stepExecution);
    • 후처리 : getCompositeListener().afterStep(stepExecution)
    • execute 메소드에서 추상 메소드인 doExecute 메소드를 실행한다.
    • Step의 여러 상태를 관리, 변경한다.

    TaskletStep.doExecute

    @Override
    protected void doExecute(StepExecution stepExecution) throws Exception {
        stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
        stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
    
        stream.update(stepExecution.getExecutionContext());
        getJobRepository().updateExecutionContext(stepExecution);
    
        // Shared semaphore per step execution, so other step executions can run
        // in parallel without needing the lock
        final Semaphore semaphore = createSemaphore();
    
        stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
    
            @Override
            public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
                    throws Exception {
    
                StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
    
                // Before starting a new transaction, check for
                // interruption.
                interruptionPolicy.checkInterrupted(stepExecution);
    
                RepeatStatus result;
                try {
                    result = new TransactionTemplate(transactionManager, transactionAttribute)
                    .execute(new ChunkTransactionCallback(chunkContext, semaphore));
                }
                catch (UncheckedTransactionException e) {
                    // Allow checked exceptions to be thrown inside callback
                    throw (Exception) e.getCause();
                }
    
                chunkListener.afterChunk(chunkContext);
    
                // Check for interruption after transaction as well, so that
                // the interrupted exception is correctly propagated up to
                // caller
                interruptionPolicy.checkInterrupted(stepExecution);
    
                return result;
            }
    
        });
    
    }
    • ItemReader, ItemProcessor, ItemWriter 실행 : stream.update(stepExecution.getExecutionContext());
    • 하나의 트랜잭션에서 chunk의 크기만큼 처리하기 위해 java.util.Semaphore 사용 : final Semaphore semaphore = createSemaphore();

    출처 : https://blog.woniper.net/358?category=699184