728x90
반응형


비대면 고객케어 개발 프로젝트를 진행하면서 spring batch를 활용할 일이 생겼다.
연동을 위해 필요한데, 타 시스템에서 파일형태로 대용량의 데이터를 넘겨주면
주기적으로 넘겨받은 파일을 읽어서 DB에 저장하는 연동이다.

연동 Flow는 간단하지만, 일 20만건 이상의 대용량의 데이터를 다루기 때문에 batch를 고려하게 되었다.

1. Spring Batch란?

Spring Batch는 Spring환경에서 대용량의 데이터 처리를 위한 기능을 제공하는 프레임워크이다.
Batch의 사전적 의미도 일괄처리인 것처럼 Spring환경에서 대용량의 데이터 처리를 위해 사용된다.

대용량의 데이터 처리를 위한 기능으로는 로깅/추적, 트랜잭션 관리, 작업 처리 통계, 작업 재시작, 건너뛰기, 리소스 관리 등이 있다.

Batch에 대한 오해

"Spring Batch는 스케줄러가 아니다."
Spring Batch를 제대로 접하기 전까지 나는 Batch가 특정 작업을 일정 주기마다 반복적으로 실행시키기 위한 프레임워크로 알고 있었다. 하지만 Batch와 스케줄러는 별개의 기능이다.

Batch는 데이터를 대용량으로 일괄처리하기 위한 Job이라는 형태의 객체를 제공할 뿐이고, 해당 Job을 제공받아 실행하는 스케줄러는 별도로 정의해주어야 한다.

스케줄러의 형태는 @Scheduled어노테이션을 붙이는 가장 기본적인 형태부터 Quartz까지 다양하게 정의될 수 있다.
본 글에서는 기본적 형태의 스케줄러를 적용하여 Batch를 알아볼 것이다.

2. Batch 아키텍처

Batch는 기본적으로 아래 사진처럼 구성되어 있다. 하나씩 살펴보자.

Job Scheduler

: 정의된 배치Job을 일정 주기마다 실행시켜주는 Scheduler 객체이다. 위에서 언급했던 것처럼 Batch와는 직접적인 연관이 없다.

Job Launcher

: 실제로 배치Job을 실행시켜주는 객체이다. Scheduler와 함께 사용한다면, Scheduler 객체 안에서 Job Launcher를 활용하여 배치Job을 실행시켜주는 방식으로 사용된다. Job Instance를 구분하기 위한 Job Parameter를 함께 입력할 수 있다.

Job

: 실제 데이터를 Batch로 처리하기 위한 작업 단위이다. 비유한다면, Job Launcher는 작업자이고 Job은 작업자에게 할당된 작업의 내용이라고 보면 된다.

Step

: 정의된 Job을 효율적이고 안정적으로 처리하기 위해 작업을 나눈 단위이다. Job을 요리에 비유한다면, 요리라는 Job은 재료준비-재료손질-조리라는 과정대로 진행될 것이고, 재료준비와 같은 각 순서들이 Step이라고 볼 수 있다.

Job Repository

: 정의된 Job의 처리를 위한 메타데이터가 저장되어있는 저장소이다. 해당 메타데이터는 위에서 언급한 Batch의 대용량 처리를 위한 기능(로깅/추적, 작업재시작, 건너뛰기 등)에 활용된다. 위에서 비유한 것처럼 Job이 요리라면, 나의 요리일지(?)나 미리 손질된 재료들의 정보들이 메타데이터이고 Job Repository에 기록된다고 보면 된다.

tasklet 방식

tasklet은 Spring Batch의 Step을 구현하는 기본 방식이다.

chunk 방식

chunk방식은 chunk 단위 만큼 Transaction을 수행하게 할 수 있는 방식이다.
step을 정의할 때, chunk의 수를 정할 수 있다. step이 한번 수행될 때마다 chunk 수만큼 transaction이 실행되므로
step 실행 도중 실패시, rollback도 chunk수만큼 수행된다.

chunk방식은 step이 아래 3개 모듈로 나눠서 실행된다.

ItemReader

: 데이터를 읽는 모듈이다. File을 읽을 수도 있고, DB를 읽을 수도 있다.

ItemProcessor

: 데이터를 처리하는 모듈이다. 데이터를 저장하기 전에 가공이 필요할 경우, 해당 모듈에서 진행하게 된다.
해당 모듈은 option이다.

ItemWriter

: 데이터를 저장하는 모듈이다. DB에 저장하거나 File로 남길 수 있다.

3. 예제

  • Job Scheduler
    : JobLauncher를 활용하여 주기적으로 job을 실행시켜주는 객체
@Configuration
@RequiredArgsConstructor
public class JobScheduler {

    private final JobLauncher jobLauncher;
    private final ExampleJobConfig exampleJobConfig ;

    @Scheduled(cron = "1 * * * * *")
    public void jobScheduled() throws JobParametersInvalidException, JobExecutionAlreadyRunningException,
            JobRestartException, JobInstanceAlreadyCompleteException {

            // make job parameter
            Map<String, JobParameter> jobParametersMap = new HashMap<>();
            SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
            Date time = new Date();
            String time1 = format1.format(time);
            jobParametersMap.put("date",new JobParameter(time1));
            JobParameters parameters = new JobParameters(jobParametersMap);

            // job start
            JobExecution jobExecution = jobLauncher.run(exampleJobConfig.exampleJob, parameters);

    }

}
  • Job (tasklet 방식)
    : 실제 실행되는 job 정의
@Configuration
@RequiredArgsConstructor
public class ExampleJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job exampleJob() {
        return jobBuilderFactory.get("exampleJob")
                .start(exampleStep())
                .build();
    }

    @Bean
    public Step exampleStep() {
        return stepBuilderFactory.get("exampleStep")
                .tasklet(new ExampleTasklet())
                .build();
    }

}
  • tasklet
@Slf4j
public class ExampleTasklet implements Tasklet, StepExecutionListener {

    @Override
    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        log.info("Before Step");
    }

    @Override
    @AfterStep
    public ExitStatus afterStep(StepExecution stepExecution) {

        log.info("After Step");

        return ExitStatus.COMPLETED;
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

        //비즈니스 로직
        log.info("Business Logic");

        return RepeatStatus.FINISHED;
    }

}
  • Job (chunk 방식)
    : 실제 실행되는 job 정의, Reader, Processor, Writer를 각각 정의해주어야 한다. (외부 클래스로 빼도 됨)
@Configuration
@RequiredArgsConstructor
public class ExampleJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job exampleJob() {
        return jobBuilderFactory.get("exampleJob")
                .start(exampleStep())
                .build();
    }

    @Bean
    public Step exampleStep() {
        return stepBuilderFactory.get("exampleStep")
                .<Member,Member>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Member> reader() throws Exception {

        Map<String,Object> parameterValues = new HashMap<>();
        parameterValues.put("amount", "10000");

        //pageSize와 fethSize는 동일하게 설정
        return new JdbcPagingItemReaderBuilder<Member>()
                .pageSize(10)
                .fetchSize(10)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Member.class))
                .queryProvider(customQueryProvider())
                .parameterValues(parameterValues)
                .name("JdbcPagingItemReader")
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Member, Member> processor(){

        return new ItemProcessor<Member, Member>() {
            @Override
            public Member process(Member member) throws Exception {

                //1000원 추가 적립
                member.setAmount(member.getAmount() + 1000);

                return member;
            }
        };
    }

    @Bean
    @StepScope
    public JdbcBatchItemWriter<Member> writer(){
        return new JdbcBatchItemWriterBuilder<Member>()
                .dataSource(dataSource)
                .sql("UPDATE MEMBER SET AMOUNT = :amount WHERE ID = :id")
                .beanMapped()
                .build();

    }

    public PagingQueryProvider customQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();

        queryProviderFactoryBean.setDataSource(dataSource);

        queryProviderFactoryBean.setSelectClause("SELECT ID, NAME, EMAIL, NICK_NAME, STATUS, AMOUNT ");
        queryProviderFactoryBean.setFromClause("FROM MEMBER ");
        queryProviderFactoryBean.setWhereClause("WHERE AMOUNT >= :amount");

        Map<String,Order> sortKey = new HashMap<>();
        sortKey.put("id", Order.ASCENDING);

        queryProviderFactoryBean.setSortKeys(sortKey);

        return queryProviderFactoryBean.getObject();

    }

}

이어서

다음은 실제 진행했던 개발 내용과 겪었던 시행착오에 대해 다룰 것이다.

반응형

+ Recent posts