Spring Batch의 동작 코드 #Job 생성과 실행1
이번 글에선 기본 개념에 정리한 여러 클래스가 서로 어떤 의존성을 가지며, 어떻게 동작하는지 코드를 살펴볼 것이다. 모든 코드를 다 볼 수 없기에 이번 글은 Job이 어떻게 생성되고 실행되는지 살펴본다.
Sample
@Configuration
public class SimpleConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("simple-job")
.start(step())
.build();
}
@Bean
public Step step() {
return stepBuilderFactory.get("simple-step")
.<String, StringWrapper>chunk(10)
.reader(itemReader())
.processor(itemProcess())
.writer(itemWriter())
.build();
}
private ItemReader<String> itemReader() {
List<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add("test" + i);
}
return new ListItemReader(list);
}
private ItemProcessor<String, StringWrapper> itemProcess() {
return StringWrapper::new;
}
private ItemWriter<StringWrapper> itemWriter() {
return System.out::println;
}
private class StringWrapper {
private String value;
StringWrapper(String value) {
this.value = value;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return String.format("i'm %s", getValue());
}
}
}
위 예제는 아주 단순한 batch다.
- Job은 하나의 Step을 갖고 있으며,
- Step의 ItemReader는 ArrayList에 100개의
String value
를 담고 있다. (읽기) - ItemProcessor는 ItemReader에서 반환된 String List를
StringWrapper
클래스로 wrapping 한다. (가공) - ItemWriter는 ItemProcessor를 통해 StringWrapper로 반환된 List를
System.out.println
으로 로그를 찍는다. (쓰기)
실행
@Configuration
@EnableBatchProcessing
public class JobRunnerConfiguration {
@Bean
public JobLauncherTestUtils utils() throws Exception {
return new JobLauncherTestUtils();
}
}
Batch를 실행하기 위한 JobLauncherTestUtils
를 Bean으로 등록
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SimpleConfiguration.class, JobRunnerConfiguration.class})
public class SimpleConfigurationTests {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void testLaunchJob() throws Exception {
jobLauncherTestUtils.launchJob();
}
}
SimpleConfiguration, JobRunnerConfiguration
을@Contextconfiguration
을 이용해 테스트에 필요한 Config Bean으로 등록- Bean으로 등록된
JobLauncherTestUtils
를 주입(@Autowired) 받아 Batch Job(SimpleConfiguration)을 실행 - 이때 JobLauncherTestUtils은
SimpleJobLauncher
를 이용해 Batch Job을 실행
- SimpleConfiguration 예제의
JobBuilderFactory, StepBuilderFactory
의 자세한 설명은 생략한다. Job과 Step을 생성하는 객체라고 생각하면 된다.
SimpleJobLauncher
최대한 간단하게 diagram을 그리려 노력했다. 위 예제를 기준으로 Spring Batch가 내부적으로 어떻게 동작하는지 살펴보자.
일단 예제에서 JobLauncherTestUtils가 SimpleJobLauncher
를 통해 Job을 실행한다고 설명했다. 실제로 어떻게 실행하는지 코드를 보자.
@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException {
Assert.notNull(job, "The Job must not be null.");
Assert.notNull(jobParameters, "The JobParameters must not be null.");
final JobExecution jobExecution;
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
if (lastExecution != null) {
// 재실행 가능한 Job 인지 체크
if (!job.isRestartable()) {
throw new JobRestartException("JobInstance already exists and is not restartable");
}
/*
* validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING
* retrieve the previous execution and check
*/
for (StepExecution execution : lastExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
if (status.isRunning() || status == BatchStatus.STOPPING) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ lastExecution);
} else if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException(
"Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
+ "The last execution ended with a failure that could not be rolled back, "
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
}
}
}
job.getJobParametersValidator().validate(jobParameters);
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
try {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
+ "]");
job.execute(jobExecution);
logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
+ "] and the following status: [" + jobExecution.getStatus() + "]");
}
catch (Throwable t) {
logger.info("Job: [" + job
+ "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
+ "]", t);
rethrow(t);
}
}
private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
});
}
catch (TaskRejectedException e) {
jobExecution.upgradeStatus(BatchStatus.FAILED);
if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
jobRepository.update(jobExecution);
}
return jobExecution;
}
- JobExecution 반환
- JobRepository로 JobExecution 조회 및 생성
run 메소드는 Job 객체와 JobParamter 객체를 받아 JobRepository
를 이용해 JobExecution을 조회(getLastJobExecution) 및 생성(createJobExecution)한다.
SimpleJobRepository.createJobExecution
@Override
public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
Assert.notNull(jobName, "Job name must not be null.");
Assert.notNull(jobParameters, "JobParameters must not be null.");
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
ExecutionContext executionContext;
if (jobInstance != null) {
List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);
for (JobExecution execution : executions) {
if (execution.isRunning() || execution.isStopping()) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ jobInstance);
}
BatchStatus status = execution.getStatus();
if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException("Cannot restart job from UNKNOWN status. "
+ "The last execution ended with a failure that could not be rolled back, "
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
}
if (execution.getJobParameters().getParameters().size() > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) {
throw new JobInstanceAlreadyCompleteException(
"A job instance already exists and is complete for parameters=" + jobParameters
+ ". If you want to run this job again, change the parameters.");
}
}
executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
}
else {
jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
executionContext = new ExecutionContext();
}
JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
jobExecution.setExecutionContext(executionContext);
jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
jobExecutionDao.saveJobExecution(jobExecution);
ecDao.saveExecutionContext(jobExecution);
return jobExecution;
}
SimpleJobRepository는 JobRepository interface의 구현체.
- JobInstance를 조회
- JobInstance가 null이 아니라면, 실행 가능한 Job인지 체크 후
JobExecution
을 조회(ecDao.getExecutionContext) - JobInstance가 null이라면, JobInstance와
ExecutionContext
를생성
- 마지막으로 JobExecution을
저장
이 코드에서 몇가지 객체가 눈에 들어온다.
JobExecution
앞 글에서 JobExecution에 대해 설명했다. Job이 한번 실행될 때 생성되는 객체다. 이 객체는 Job이 실행되는 데 위해 필요한 아래와 같은 객체를 담고 있다.
- JobParamter : Job을 실행하기 위해 필요한 paramter
- JobInstance : JobExecution을 조회하기 위한 id, name
- Collection
: Job이 포함하고 있는 실행 가능한 StepExecution List - Job 실행 생성, 시작, 종료, 수정 시간
- 그 외 여러 객체
private final JobParameters jobParameters;
private JobInstance jobInstance;
private volatile Collection<StepExecution> stepExecutions = Collections.synchronizedSet(new LinkedHashSet<>());
private volatile BatchStatus status = BatchStatus.STARTING;
private volatile Date startTime = null;
private volatile Date createTime = new Date(System.currentTimeMillis());
private volatile Date endTime = null;
private volatile Date lastUpdated = null;
private volatile ExitStatus exitStatus = ExitStatus.UNKNOWN;
private volatile ExecutionContext executionContext = new ExecutionContext();
private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<>();
private final String jobConfigurationName;
JobExecution에 멤버 변수로 선언된 객체들.
ExecutionContext
ExecutionContext
객체는 Job이 실행되는 동안 필요한 데이터를 메모리(Map)에 저장하고 관리하는 객체다. 실제로 이 객체를 살펴보면 Map을 통해 데이터를 저장, 조회한다.
ExecutionContext의 생명 주기는 Job이 실행되는 동안 사용된다.
job.execute(jobExecution);
다시 SimpleJobLauncher
코드로 돌아가 보자. job.execute 메소드가 바로 Job을 실행하는 부분이다. JobRepository를 통해 생성된 JobExecution
을 argument로 넘긴다. 즉, JobExecution은 Job을 실행하는 데 필요한 객체다.
AbstractJob.execute
@Override
public final void execute(JobExecution execution) {
if (logger.isDebugEnabled()) {
logger.debug("Job execution starting: " + execution);
}
// 1. ThreadLocal에 현재 실행될 Job 등록
JobSynchronizationManager.register(execution);
try {
// 2. 실행 가능한 Job인지 JobParameter 검증
jobParametersValidator.validate(execution.getJobParameters());
if (execution.getStatus() != BatchStatus.STOPPING) {
// 3. 시작 시간 등록
execution.setStartTime(new Date());
// 4. Batch 상태를 시작으로 변경
updateStatus(execution, BatchStatus.STARTED);
// 5. JobExecutionListener.beforeJob 실행 (전 처리)
listener.beforeJob(execution);
try {
// 6. job 구현체 실행
doExecute(execution);
if (logger.isDebugEnabled()) {
logger.debug("Job execution complete: " + execution);
}
} catch (RepeatException e) {
throw e.getCause();
}
} else {
// The job was already stopped before we even got this far. Deal
// with it in the same way as any other interruption.
execution.setStatus(BatchStatus.STOPPED);
execution.setExitStatus(ExitStatus.COMPLETED);
if (logger.isDebugEnabled()) {
logger.debug("Job execution was stopped: " + execution);
}
}
} catch (JobInterruptedException e) {
logger.info("Encountered interruption executing job: "
+ e.getMessage());
if (logger.isDebugEnabled()) {
logger.debug("Full exception", e);
}
execution.setExitStatus(getDefaultExitStatusForFailure(e, execution));
execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus()));
execution.addFailureException(e);
} catch (Throwable t) {
logger.error("Encountered fatal error executing job", t);
execution.setExitStatus(getDefaultExitStatusForFailure(t, execution));
execution.setStatus(BatchStatus.FAILED);
execution.addFailureException(t);
} finally {
try {
if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED)
&& execution.getStepExecutions().isEmpty()) {
ExitStatus exitStatus = execution.getExitStatus();
ExitStatus newExitStatus =
ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job.");
execution.setExitStatus(exitStatus.and(newExitStatus));
}
execution.setEndTime(new Date());
try {
// 7. JobExecutionListener.afterJob 실행 (후 처리)
listener.afterJob(execution);
} catch (Exception e) {
logger.error("Exception encountered in afterStep callback", e);
}
jobRepository.update(execution);
} finally {
JobSynchronizationManager.release();
}
}
}
코드가 길고 복잡하지만, 주석을 보면 그렇게 복잡한 로직은 아니다.
5. JobExecutionListener.beforeJob 실행
Job이 실행되기 전처리, 후처리 가능한 JobExecutionListener가 있다. 이 정도만 알고 넘어가자.
6. job 구현체 실행
abstract protected void doExecute(JobExecution execution) throws JobExecutionException;
AbstractJob은 이름과 같이 추상 클래스다. AbstractJob.doExecute
메소드는 추상 메소드다. 이를 상속받아 구현된 Job 객체가 doExecution을 구현하고 있을 것이다. 그럼 이 예제에서 AbstractJob을 구현한 구현 객체는 무엇일까?
SimpleJob.doExecute
@Override
protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException,
StartLimitExceededException {
StepExecution stepExecution = null;
for (Step step : steps) {
stepExecution = handleStep(step, execution);
if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
//
// Terminate the job if a step fails
//
break;
}
}
//
// Update the job status to be the same as the last step
//
if (stepExecution != null) {
if (logger.isDebugEnabled()) {
logger.debug("Upgrading JobExecution status: " + stepExecution);
}
execution.upgradeStatus(stepExecution.getStatus());
execution.setExitStatus(stepExecution.getExitStatus());
}
}
SimpleJob
이 바로 AbstractJob을 구현한 구현체다. doExecute 메소드는 Step List를 실행한다.
Step이 실행되는 과정은 다음 포스팅에.
'Spring Framework > Spring boot' 카테고리의 다른 글
실행 중인 Spring Boot pid 파일 생성 (0) | 2020.09.04 |
---|---|
CompletableFuture 비동기 처리로 성능 개선하기 (0) | 2020.09.04 |
Spring Data JPA 같은 이름, 다른 type인 2개의 @Entity인 경우 주의 사항 (0) | 2020.09.04 |
Spring Batch의 동작 코드 #Step 생성과 실행 (0) | 2020.09.04 |
누구나 아는 Spring Batch 기본 개념 (0) | 2020.09.04 |
Spring Data REST #3 내부 동작 (0) | 2020.09.04 |
Spring Data REST #2 동작 원리 (0) | 2020.09.04 |
Spring Data REST #1 Introduction (0) | 2020.09.04 |