Spring Batch의 동작 코드 #Job 생성과 실행1

2020. 9. 4. 16:19 Spring Framework/Spring boot

이번 글에선 기본 개념에 정리한 여러 클래스가 서로 어떤 의존성을 가지며, 어떻게 동작하는지 코드를 살펴볼 것이다. 모든 코드를 다 볼 수 없기에 이번 글은 Job이 어떻게 생성되고 실행되는지 살펴본다.

Sample

@Configuration
public class SimpleConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("simple-job")
                .start(step())
                .build();
    }

    @Bean
    public Step step() {
        return stepBuilderFactory.get("simple-step")
                .<String, StringWrapper>chunk(10)
                .reader(itemReader())
                .processor(itemProcess())
                .writer(itemWriter())
                .build();
    }

    private ItemReader<String> itemReader() {
        List<String> list = new ArrayList<>();

        for (int i = 0; i < 100; i++) {
            list.add("test" + i);
        }

        return new ListItemReader(list);
    }

    private ItemProcessor<String, StringWrapper> itemProcess() {
        return StringWrapper::new;
    }

    private ItemWriter<StringWrapper> itemWriter() {
        return System.out::println;
    }

    private class StringWrapper {
        private String value;

        StringWrapper(String value) {
            this.value = value;
        }

        public String getValue() {
            return value;
        }

        @Override
        public String toString() {
            return String.format("i'm %s", getValue());
        }
    }
}

위 예제는 아주 단순한 batch다.

  • Job은 하나의 Step을 갖고 있으며, 
  • Step의 ItemReader는 ArrayList에 100개의 String value를 담고 있다. (읽기)
  • ItemProcessor는 ItemReader에서 반환된 String List를 StringWrapper 클래스로 wrapping 한다. (가공)
  • ItemWriter는 ItemProcessor를 통해 StringWrapper로 반환된 List를 System.out.println으로 로그를 찍는다. (쓰기)

실행

@Configuration
@EnableBatchProcessing
public class JobRunnerConfiguration {

    @Bean
    public JobLauncherTestUtils utils() throws Exception {
        return new JobLauncherTestUtils();
    }

}

Batch를 실행하기 위한 JobLauncherTestUtils를 Bean으로 등록

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SimpleConfiguration.class, JobRunnerConfiguration.class})
public class SimpleConfigurationTests {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void testLaunchJob() throws Exception {
        jobLauncherTestUtils.launchJob();
    }
    
}
  • SimpleConfiguration, JobRunnerConfiguration @Contextconfiguration을 이용해 테스트에 필요한 Config Bean으로 등록
  • Bean으로 등록된 JobLauncherTestUtils를 주입(@Autowired) 받아 Batch Job(SimpleConfiguration)을 실행
  • 이때 JobLauncherTestUtils은 SimpleJobLauncher를 이용해 Batch Job을 실행
  • SimpleConfiguration 예제의 JobBuilderFactory, StepBuilderFactory의 자세한 설명은 생략한다. Job과 Step을 생성하는 객체라고 생각하면 된다.

SimpleJobLauncher


최대한 간단하게 diagram을 그리려 노력했다. 위 예제를 기준으로 Spring Batch가 내부적으로 어떻게 동작하는지 살펴보자.

일단 예제에서 JobLauncherTestUtils가 SimpleJobLauncher를 통해 Job을 실행한다고 설명했다. 실제로 어떻게 실행하는지 코드를 보자.

@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
        throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
        JobParametersInvalidException {

    Assert.notNull(job, "The Job must not be null.");
    Assert.notNull(jobParameters, "The JobParameters must not be null.");

    final JobExecution jobExecution;
    JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
    if (lastExecution != null) {
    
        // 재실행 가능한 Job 인지 체크
        if (!job.isRestartable()) {
            throw new JobRestartException("JobInstance already exists and is not restartable");
        }
        /*
         * validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING
         * retrieve the previous execution and check
         */
        for (StepExecution execution : lastExecution.getStepExecutions()) {
            BatchStatus status = execution.getStatus();
            if (status.isRunning() || status == BatchStatus.STOPPING) {
                throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
                        + lastExecution);
            } else if (status == BatchStatus.UNKNOWN) {
                throw new JobRestartException(
                        "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
                            + "The last execution ended with a failure that could not be rolled back, "
                            + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
            }
        }
    }

    job.getJobParametersValidator().validate(jobParameters);

    jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

    try {
        taskExecutor.execute(new Runnable() {

            @Override
            public void run() {
                try {
                    logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
                            + "]");
                    job.execute(jobExecution);
                    logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
                            + "] and the following status: [" + jobExecution.getStatus() + "]");
                }
                catch (Throwable t) {
                    logger.info("Job: [" + job
                            + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
                            + "]", t);
                    rethrow(t);
                }
            }

            private void rethrow(Throwable t) {
                if (t instanceof RuntimeException) {
                    throw (RuntimeException) t;
                }
                else if (t instanceof Error) {
                    throw (Error) t;
                }
                throw new IllegalStateException(t);
            }
        });
    }
    catch (TaskRejectedException e) {
        jobExecution.upgradeStatus(BatchStatus.FAILED);
        if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
            jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
        }
        jobRepository.update(jobExecution);
    }

    return jobExecution;
}
  • JobExecution 반환
  • JobRepository로 JobExecution 조회 및 생성

run 메소드는 Job 객체와 JobParamter 객체를 받아 JobRepository를 이용해 JobExecution을 조회(getLastJobExecution) 및 생성(createJobExecution)한다.

SimpleJobRepository.createJobExecution

@Override
public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
        throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {

    Assert.notNull(jobName, "Job name must not be null.");
    Assert.notNull(jobParameters, "JobParameters must not be null.");

    JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
    ExecutionContext executionContext;

    if (jobInstance != null) {

        List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);

        for (JobExecution execution : executions) {
            if (execution.isRunning() || execution.isStopping()) {
                throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
                        + jobInstance);
            }
            BatchStatus status = execution.getStatus();
            if (status == BatchStatus.UNKNOWN) {
                throw new JobRestartException("Cannot restart job from UNKNOWN status. "
                        + "The last execution ended with a failure that could not be rolled back, "
                        + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
            }
            if (execution.getJobParameters().getParameters().size() > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) {
                throw new JobInstanceAlreadyCompleteException(
                        "A job instance already exists and is complete for parameters=" + jobParameters
                        + ".  If you want to run this job again, change the parameters.");
            }
        }
        executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
    }
    else {
        jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
        executionContext = new ExecutionContext();
    }

    JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
    jobExecution.setExecutionContext(executionContext);
    jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));

    jobExecutionDao.saveJobExecution(jobExecution);
    ecDao.saveExecutionContext(jobExecution);

    return jobExecution;

}

SimpleJobRepository는 JobRepository interface의 구현체.

  • JobInstance를 조회
  • JobInstance가 null이 아니라면, 실행 가능한 Job인지 체크 후 JobExecution을 조회(ecDao.getExecutionContext)
  • JobInstance가 null이라면, JobInstance와 ExecutionContext 생성
  • 마지막으로 JobExecution을 저장

이 코드에서 몇가지 객체가 눈에 들어온다.

JobExecution

앞 글에서 JobExecution에 대해 설명했다. Job이 한번 실행될 때 생성되는 객체다. 이 객체는 Job이 실행되는 데 위해 필요한 아래와 같은 객체를 담고 있다.

  • JobParamter : Job을 실행하기 위해 필요한 paramter
  • JobInstance : JobExecution을 조회하기 위한 id, name
  • Collection : Job이 포함하고 있는 실행 가능한 StepExecution List
  • Job 실행 생성, 시작, 종료, 수정 시간
  • 그 외 여러 객체
private final JobParameters jobParameters;
private JobInstance jobInstance;
private volatile Collection<StepExecution> stepExecutions = Collections.synchronizedSet(new LinkedHashSet<>());
private volatile BatchStatus status = BatchStatus.STARTING;
private volatile Date startTime = null;
private volatile Date createTime = new Date(System.currentTimeMillis());
private volatile Date endTime = null;
private volatile Date lastUpdated = null;
private volatile ExitStatus exitStatus = ExitStatus.UNKNOWN;
private volatile ExecutionContext executionContext = new ExecutionContext();
private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<>();
private final String jobConfigurationName;

JobExecution에 멤버 변수로 선언된 객체들.

ExecutionContext

ExecutionContext 객체는 Job이 실행되는 동안 필요한 데이터를 메모리(Map)에 저장하고 관리하는 객체다. 실제로 이 객체를 살펴보면 Map을 통해 데이터를 저장, 조회한다.

ExecutionContext의 생명 주기는 Job이 실행되는 동안 사용된다.

job.execute(jobExecution);

다시 SimpleJobLauncher 코드로 돌아가 보자. job.execute 메소드가 바로 Job을 실행하는 부분이다. JobRepository를 통해 생성된 JobExecution을 argument로 넘긴다. 즉, JobExecution은 Job을 실행하는 데 필요한 객체다.

AbstractJob.execute

@Override
public final void execute(JobExecution execution) {

    if (logger.isDebugEnabled()) {
        logger.debug("Job execution starting: " + execution);
    }

    // 1. ThreadLocal에 현재 실행될 Job 등록
    JobSynchronizationManager.register(execution);

    try {

        // 2. 실행 가능한 Job인지 JobParameter 검증
        jobParametersValidator.validate(execution.getJobParameters());

        if (execution.getStatus() != BatchStatus.STOPPING) {

            // 3. 시작 시간 등록
            execution.setStartTime(new Date());
            // 4. Batch 상태를 시작으로 변경
            updateStatus(execution, BatchStatus.STARTED);

            // 5. JobExecutionListener.beforeJob 실행 (전 처리)
            listener.beforeJob(execution);

            try {
                // 6. job 구현체 실행
                doExecute(execution);
                if (logger.isDebugEnabled()) {
                    logger.debug("Job execution complete: " + execution);
                }
            } catch (RepeatException e) {
                throw e.getCause();
            }
        } else {

            // The job was already stopped before we even got this far. Deal
            // with it in the same way as any other interruption.
            execution.setStatus(BatchStatus.STOPPED);
            execution.setExitStatus(ExitStatus.COMPLETED);
            if (logger.isDebugEnabled()) {
                logger.debug("Job execution was stopped: " + execution);
            }

        }

    } catch (JobInterruptedException e) {
        logger.info("Encountered interruption executing job: "
                + e.getMessage());
        if (logger.isDebugEnabled()) {
            logger.debug("Full exception", e);
        }
        execution.setExitStatus(getDefaultExitStatusForFailure(e, execution));
        execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus()));
        execution.addFailureException(e);
    } catch (Throwable t) {
        logger.error("Encountered fatal error executing job", t);
        execution.setExitStatus(getDefaultExitStatusForFailure(t, execution));
        execution.setStatus(BatchStatus.FAILED);
        execution.addFailureException(t);
    } finally {
        try {
            if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED)
                    && execution.getStepExecutions().isEmpty()) {
                ExitStatus exitStatus = execution.getExitStatus();
                ExitStatus newExitStatus =
                        ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job.");
                execution.setExitStatus(exitStatus.and(newExitStatus));
            }

            execution.setEndTime(new Date());

            try {
                // 7. JobExecutionListener.afterJob 실행 (후 처리)
                listener.afterJob(execution);
            } catch (Exception e) {
                logger.error("Exception encountered in afterStep callback", e);
            }

            jobRepository.update(execution);
        } finally {
            JobSynchronizationManager.release();
        }

    }

}

코드가 길고 복잡하지만, 주석을 보면 그렇게 복잡한 로직은 아니다.

5. JobExecutionListener.beforeJob 실행

Job이 실행되기 전처리, 후처리 가능한 JobExecutionListener가 있다. 이 정도만 알고 넘어가자.

6. job 구현체 실행

abstract protected void doExecute(JobExecution execution) throws JobExecutionException;

AbstractJob은 이름과 같이 추상 클래스다. AbstractJob.doExecute 메소드는 추상 메소드다. 이를 상속받아 구현된 Job 객체가 doExecution을 구현하고 있을 것이다. 그럼 이 예제에서 AbstractJob을 구현한 구현 객체는 무엇일까?

SimpleJob.doExecute

@Override
protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException,
StartLimitExceededException {

    StepExecution stepExecution = null;
    for (Step step : steps) {
        stepExecution = handleStep(step, execution);
        if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
            //
            // Terminate the job if a step fails
            //
            break;
        }
    }

    //
    // Update the job status to be the same as the last step
    //
    if (stepExecution != null) {
        if (logger.isDebugEnabled()) {
            logger.debug("Upgrading JobExecution status: " + stepExecution);
        }
        execution.upgradeStatus(stepExecution.getStatus());
        execution.setExitStatus(stepExecution.getExitStatus());
    }
}

SimpleJob이 바로 AbstractJob을 구현한 구현체다. doExecute 메소드는 Step List를 실행한다. 

Step이 실행되는 과정은 다음 포스팅에.