Spring Batch의 동작 코드 #Step 생성과 실행
이 글은 Job이 어떻게 Step List를 실행하는지 코드를 보며 확인해 볼 것이다.
SimpleJob.doExecute
@Override
protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException,
StartLimitExceededException {
StepExecution stepExecution = null;
for (Step step : steps) {
stepExecution = handleStep(step, execution);
if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
//
// Terminate the job if a step fails
//
break;
}
}
//
// Update the job status to be the same as the last step
//
if (stepExecution != null) {
if (logger.isDebugEnabled()) {
logger.debug("Upgrading JobExecution status: " + stepExecution);
}
execution.upgradeStatus(stepExecution.getStatus());
execution.setExitStatus(stepExecution.getExitStatus());
}
}
SimpleJob에 대해서는 앞글에서 간단히 설명했다. doExecute
메소드를 다시 설명하는 이유는 바로 이 메소드에서 Step List를 실행하기 때문이다. 정확히는 stepExecution = handleStep(step, execution);
이 코드가 Step을 실행한다.
AbstractJob.handleStep
protected final StepExecution handleStep(Step step, JobExecution execution)
throws JobInterruptedException, JobRestartException,
StartLimitExceededException {
return stepHandler.handleStep(step, execution);
}
StepHandler.handleStep 메소드의 arguments로 Step
과 JobExecution
객체를 넘긴다.
그런데 뭔가 익숙해 보이는 StepExecution
객체가 보인다. Job을 실행하는데 필요했던 JobExecution
객체와 이름이 비슷해보인다.
SimpleStepHanldler.handleStep
@Override
public StepExecution handleStep(Step step, JobExecution execution) throws JobInterruptedException,
JobRestartException, StartLimitExceededException {
if (execution.isStopping()) {
throw new JobInterruptedException("JobExecution interrupted.");
}
JobInstance jobInstance = execution.getJobInstance();
StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) {
// If the last execution of this step was in the same job, it's
// probably intentional so we want to run it again...
logger.info(String.format("Duplicate step [%s] detected in execution of job=[%s]. "
+ "If either step fails, both will be executed again on restart.", step.getName(), jobInstance
.getJobName()));
lastStepExecution = null;
}
StepExecution currentStepExecution = lastStepExecution;
if (shouldStart(lastStepExecution, execution, step)) {
currentStepExecution = execution.createStepExecution(step.getName());
boolean isRestart = (lastStepExecution != null && !lastStepExecution.getStatus().equals(
BatchStatus.COMPLETED));
if (isRestart) {
currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
if(lastStepExecution.getExecutionContext().containsKey("batch.executed")) {
currentStepExecution.getExecutionContext().remove("batch.executed");
}
}
else {
currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
}
jobRepository.add(currentStepExecution);
logger.info("Executing step: [" + step.getName() + "]");
try {
step.execute(currentStepExecution);
currentStepExecution.getExecutionContext().put("batch.executed", true);
}
catch (JobInterruptedException e) {
// Ensure that the job gets the message that it is stopping
// and can pass it on to other steps that are executing
// concurrently.
execution.setStatus(BatchStatus.STOPPING);
throw e;
}
jobRepository.updateExecutionContext(execution);
if (currentStepExecution.getStatus() == BatchStatus.STOPPING
|| currentStepExecution.getStatus() == BatchStatus.STOPPED) {
// Ensure that the job gets the message that it is stopping
execution.setStatus(BatchStatus.STOPPING);
throw new JobInterruptedException("Job interrupted by step execution");
}
}
return currentStepExecution;
}
이 객체를 반환하는 handleStep
메소드는
- StepExecution 생성
- Step 실행 : step.execute(currentStepExecution);
Job과 마찬가지로 Step을 실행하는데 필요한 StepExecution
객체를 만들고 Step
을 실행한다. 그럼 Step의 실행되는 코드를 확인해보자.
StepBuilderFactory
StepBuilderFactory를 이용해 Step을 생성한 코드만 다시 보자.
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step() {
return stepBuilderFactory.get("simple-step")
.<String, StringWrapper>chunk(10)
.reader(itemReader())
.processor(itemProcess())
.writer(itemWriter())
.build();
}
StepBuilderFactory.get
public StepBuilder get(String name) {
StepBuilder builder = new StepBuilder(name).repository(jobRepository).transactionManager(
transactionManager);
return builder;
}
예제에서 Step을 생성하기 위해 StepBuilderFactory
를 사용했다. get
메소드를 보면 StepBuilder
객체를 생성해 반환한다.
StepBuilder.chunk
public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
}
chunk
메소드는 SimpleStepBuilder
를 반환한다.
SimpleStepBuilder.build
public SimpleStepBuilder<I, O> reader(ItemReader<? extends I> reader) {
this.reader = reader;
return this;
}
public SimpleStepBuilder<I, O> writer(ItemWriter<? super O> writer) {
this.writer = writer;
return this;
}
public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) {
this.processor = processor;
return this;
}
SimpleStepBuilder
의 reader, processor, writer
메소드는 어떤 객체를 반환하는지 확인하니 자기 자신(return this;)
을 반환한다. build
메소드를 보자.
@Override
public TaskletStep build() {
registerStepListenerAsItemListener();
registerAsStreamsAndListeners(reader, processor, writer);
return super.build();
}
TaskletStep
객체를 반환한다. 즉, SimpleStepHandler.handleStep
에서 실행한 Step interface
의 구현체는 TaskletStep
이다. TaskletStep
의 execute
메소드를 보자.
예제에서 만든 Step이
TaskletStep
을 반환하는 것 뿐이지, Step interface를 구현한 여러 종류에 구현체 들이 존재한다. 이건 나중에 나중에 알아보도록...
TaskletStep.execute
예상했겠지만, TaskletStep은 AbstractStep
을 상속받고 있다. 이 구조 또한 Job과 거의 비슷하다.
AbstractStep.execute
@Override
public final void execute(StepExecution stepExecution) throws JobInterruptedException,
UnexpectedJobExecutionException {
if (logger.isDebugEnabled()) {
logger.debug("Executing: id=" + stepExecution.getId());
}
stepExecution.setStartTime(new Date());
stepExecution.setStatus(BatchStatus.STARTED);
getJobRepository().update(stepExecution);
// Start with a default value that will be trumped by anything
ExitStatus exitStatus = ExitStatus.EXECUTING;
doExecutionRegistration(stepExecution);
try {
getCompositeListener().beforeStep(stepExecution);
open(stepExecution.getExecutionContext());
try {
doExecute(stepExecution);
}
catch (RepeatException e) {
throw e.getCause();
}
exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());
// Check if someone is trying to stop us
if (stepExecution.isTerminateOnly()) {
throw new JobInterruptedException("JobExecution interrupted.");
}
// Need to upgrade here not set, in case the execution was stopped
stepExecution.upgradeStatus(BatchStatus.COMPLETED);
if (logger.isDebugEnabled()) {
logger.debug("Step execution success: id=" + stepExecution.getId());
}
}
catch (Throwable e) {
stepExecution.upgradeStatus(determineBatchStatus(e));
exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e));
stepExecution.addFailureException(e);
if (stepExecution.getStatus() == BatchStatus.STOPPED) {
logger.info(String.format("Encountered interruption executing step %s in job %s : %s", name, stepExecution.getJobExecution().getJobInstance().getJobName(), e.getMessage()));
if (logger.isDebugEnabled()) {
logger.debug("Full exception", e);
}
}
else {
logger.error(String.format("Encountered an error executing step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}
}
finally {
try {
// Update the step execution to the latest known value so the
// listeners can act on it
exitStatus = exitStatus.and(stepExecution.getExitStatus());
stepExecution.setExitStatus(exitStatus);
exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
}
catch (Exception e) {
logger.error(String.format("Exception in afterStep callback in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}
try {
getJobRepository().updateExecutionContext(stepExecution);
}
catch (Exception e) {
stepExecution.setStatus(BatchStatus.UNKNOWN);
exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
stepExecution.addFailureException(e);
logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
+ "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}
stepExecution.setEndTime(new Date());
stepExecution.setExitStatus(exitStatus);
try {
getJobRepository().update(stepExecution);
}
catch (Exception e) {
stepExecution.setStatus(BatchStatus.UNKNOWN);
stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
stepExecution.addFailureException(e);
logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
+ "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}
try {
close(stepExecution.getExecutionContext());
}
catch (Exception e) {
logger.error(String.format("Exception while closing step execution resources in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
stepExecution.addFailureException(e);
}
doExecutionRelease();
if (logger.isDebugEnabled()) {
logger.debug("Step execution complete: " + stepExecution.getSummary());
}
}
}
- Step 또한
- 전처리 : getCompositeListener().beforeStep(stepExecution);
- 후처리 : getCompositeListener().afterStep(stepExecution)
execute
메소드에서 추상 메소드인doExecute
메소드를 실행한다.- Step의 여러 상태를 관리, 변경한다.
StepListener
가 존재한다. 이를 실행하는 코드TaskletStep.doExecute
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
stream.update(stepExecution.getExecutionContext());
getJobRepository().updateExecutionContext(stepExecution);
// Shared semaphore per step execution, so other step executions can run
// in parallel without needing the lock
final Semaphore semaphore = createSemaphore();
stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
@Override
public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
throws Exception {
StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
// Before starting a new transaction, check for
// interruption.
interruptionPolicy.checkInterrupted(stepExecution);
RepeatStatus result;
try {
result = new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));
}
catch (UncheckedTransactionException e) {
// Allow checked exceptions to be thrown inside callback
throw (Exception) e.getCause();
}
chunkListener.afterChunk(chunkContext);
// Check for interruption after transaction as well, so that
// the interrupted exception is correctly propagated up to
// caller
interruptionPolicy.checkInterrupted(stepExecution);
return result;
}
});
}
- ItemReader, ItemProcessor, ItemWriter 실행 : stream.update(stepExecution.getExecutionContext());
- 하나의 트랜잭션에서 chunk의 크기만큼 처리하기 위해
java.util.Semaphore
사용 : final Semaphore semaphore = createSemaphore();
출처 : https://blog.woniper.net/358?category=699184
'Spring Framework > Spring boot' 카테고리의 다른 글
Spring-MVC 읽기 #1. 나는 왜 오픈소스를 읽을까? (0) | 2020.09.04 |
---|---|
실행 중인 Spring Boot pid 파일 생성 (0) | 2020.09.04 |
CompletableFuture 비동기 처리로 성능 개선하기 (0) | 2020.09.04 |
Spring Data JPA 같은 이름, 다른 type인 2개의 @Entity인 경우 주의 사항 (0) | 2020.09.04 |
Spring Batch의 동작 코드 #Job 생성과 실행1 (0) | 2020.09.04 |
누구나 아는 Spring Batch 기본 개념 (0) | 2020.09.04 |
Spring Data REST #3 내부 동작 (0) | 2020.09.04 |
Spring Data REST #2 동작 원리 (0) | 2020.09.04 |