728x90
반응형

Spring

File to DB 개발의 마무리 단계인 Spring Scheduler 개발과정에 대해 정리할 것이다.

이전 게시글까지 File to DB 배치개발은 모두 완료되었다.
대용량의 File 데이터를 읽어서 DB저장하는 부분까지는 우여곡절 끝에 구현하였다. 👉 이전글 참고

아직 남은 요구사항

그리고 아직 남은 요구사항이 있었다.

  1. File to DB 배치의 실행주기는 관리자가 임의로 조정 가능할 것.
  2. 관리자의 조정은 런타임 환경에서도 가능해야 함.
  3. 배치 기능은 실행 서버를 선택할 수 있어야 함.

현재 아래와 같은 @Scheduled 어노테이션 설정 방식으로는 런타임 환경에서 관리자가 임의로 스케줄러 설정을 변경하지 못할 뿐만 아니라,
스케줄러 실행환경도 임의로 선택하지 못한다.

이러한 점을 개선하기 위해 Scheduler 기능의 추가적인 개발이 필요했다.

  • JobScheduler.java
@Component
@RequiredArgsConstructor
public class JobScheduler {

    private final JobLauncher jobLauncher;

    private final InterfaceFileReaderBatchJobConfig interfaceFileReaderBatchJobConfig;

    private final InterfaceReader interfaceReader;

    // 스케줄러 주기 설정, 어노테이션 선언 방식
    @Scheduled(cron = "0 * * * * *")
    public void runJob() throws IOException {

        ...

    }

}



Scheduler 동작원리

Scheduler 기능 개발에 앞서서 동작원리를 간단하게 살펴본다.




위 사진처럼 Scheduler를 통해 실행될 Task는 TaskScheduler에 주입되어 사용되고, Scheduler의 실행정보는 Trigger에 주입되어 사용된다.
그리고 Scheduler 기능은 위 두가지 추상클래스를 이용하여 실행되는 구조이다.

Scheduler 초기화 순서

  1. Scheduler가 실행될 root 클래스에 선언된 @EnableScheduling 어노테이션을 통해 Scheduler 초기화 작업이 진행된다.
  2. @EnableScheduling 어노테이션에 import된 SchedulingConfiguration 클래스가 ScheduledAnnotationBeanPostProcessor를 Bean으로 등록 ( ScheduledAnnotationBeanPostProcessor을 통해 스케줄링에 필요한 실질적인 초기화 작업이 수행됨 )
  3. ScheduledAnnotationBeanPostProcessor 클래스가 @Scheduled 어노테이션이 붙은 메소드를 스케줄러로 등록
  4. @Scheduled 어노테이션에 설정된 값(ex. fixedDelay, cron 등)에 맞게 task를 생성하여 ScheduledTaskRegistrar에 등록, 실제 작업은 Runnable에 할당 ( cron 표기법이 사용되었을 경우, CronTrigger가 등록됨 )

위 과정을 통해 ScheduledTaskRegistrar에 task가 등록되면, TaskScheduler가 등록된 task를 일정 주기에 맞게 실행시켜 준다.

TaskScheduler 동작 순서

  1. TaskScheduler 클래스의 schedule 메소드가 파라미터로 task정보를 받아 실행된다.
  2. 일정주기 이후에 실행될 작업을 생성하기 위해 파라미터로 받은 task 정보를 기반으로 ReschedulingRunnable 을 생성한다.
  3. 일정주기 이후에 run 메소드가 실행되어 작업이 수행되면, 다시 schedule 메소드를 호출하여 ReschedulingRunnable 을 생성한다.
    (반복)

위 과정이 반복되면서 등록된 task가 일정주기마다 실행된다.

Scheduler 기능 개발 : ThreadPoolTaskScheduler 활용

지금까지 Scheduler의 동작원리를 살펴보았다.
이제 다시 돌아와서, 요구사항을 충족시키기 위해 Scheduler의 추가적인 개발을 진행해보자.

요구사항에 따르면,
Scheduler는 어느 서버에서 실행되어야 하는지, 얼만큼의 주기를 가져야 하는지 런타임 환경에서 조정할 수 있어야 한다.
이를 위해 ThreadPoolTaskScheduler 객체를 활용할 수 있다.

ThreadPoolTaskScheduler란?

ThreadPoolTaskSchedulerConcurrentTaskScheduler 와 같이 TaskScheduler에 등록될 수 있는 Bean이다.
둘 중 ConcurrentTaskScheduler@EnableScheduling 어노테이션을 통해 default로 TaskScheduler에 등록되어,
위에서 언급한 Scheduler 초기화 작업을 수행한다.

다른 하나인 ThreadPoolTaskScheduler 는 사용자가 해당 Bean을 생성하면, @EnableScheduling 어노테이션을 통해 추가적으로 TaskSchedulerThreadPoolTaskScheduler Bean도 등록해준다.

그리고 우리는 ThreadPoolTaskScheduler 를 통해 커스터마이징한 Scheduler를 등록할 수 있다.

위와 같이 Scheduler를 커스터마이징할 수 있는 ThreadPoolTaskScheduler 의 강력한 특징을 활용하여 런타임환경에서도 Scheduler의 설정을 변경할 수 있도록 추가적인 기능 개발을 진행할 것이다.

Scheduler 기능 개발 수행

기능 개발을 위해 아래와 같이 코드를 구성하였다.

  • ThreadPoolTaskSchedulerConfig.java
@Configuaration
public class ThreadPoolTaskSchedulerConfig {

    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(1);                        // Scheduler task 실행을 위한 thread의 갯수
        taskScheduler.setThreadNamePrefix("jobScheduler");    // thread 이름
        taskScheduler.initialize();                            // 초기화
        return taskScheduler;
    }

}

위 설정 코드를 통해 Scheduler task 수행을 위한 thread를 몇 개를 생성할지 결정하였다.
아직 Scheduler 기능은 소규모이고, 배치 job들이 비동기로 수행되어 발생되는 예기치못한 오류들을 방지하기 위해
일단 1개만 생성하기로 하였다.

Scheduler task 수행에 멀티스레딩 방식이 필요할 시,
해당 thread의 갯수를 늘리고, 배치 job간 공유하여 사용하고 있는 자원들을 파악하여 멀티스레딩 환경에 맞게 추가개발이 필요할 것이다.

  • SchedulerConfig.java
@Table(name = "TB_SCHEDULER_CONFIG")
@Data
@NoArgsConstructor(access = AccessLevel.PUBLIC)
@Entity
public class SchedulerConfig {

    @Id
    @Column(name = "scheduler_idx", length = 2, nullable = false)
    private String schedulerIdx;

    @Column(name = "scheduler_name", length = 50, nullable = false)
    private String schedulerName;

    @Column(name = "cron", length = 50, nullable = false)
    private String cron;

    @Column(name = "execution_yn", length = 1, nullable = false)
    private String executionYn;

    @Column(name = "execution_ip", length = 20, nullable = false)
    private String executionIp;

    @Column(name = "last_chg_dt")
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date lastChgDt;

}
  • SchedulerConfigRepository.java
public interface SchedulerConfigRepository extends JpaRepository<SchedulerConfig, Long> {

    SchedulerConfig findBySchedulerIdxAndSchedulerName(String scheduler_idx, String scheduler_name);

}

위와 같이 각 Scheduler의 설정정보를 담기 위한 테이블 및 Entity를 생성하였다.
각 필드에 대한 desc은 다음과 같다.

  1. scheduler_idx : Scheduler를 식별하기 위한 index
  2. scheduler_name : Scheduler의 이름 (index와 매핑)
  3. cron : Scheduler 실행 주기를 결정할 cron식
  4. execution_yn : AP 내부에서의 Scheduler 실행여부를 결정
  5. execution_ip : Scheduler가 실행될 서버의 ip
  6. last_chg_dt : 설정 최종변경 일시

런타임환경 실행주기 변경, 특정 서버에서만 실행, 실행 여부 변경이라는 요구사항을 충족시키기 위해
위와 같이 설정테이블을 구성한 것이다.

그리고 각 Scheduler가 위 테이블에 저장된 설정 값을 기반으로 실행될 수 있도록 Scheduler 코드를 작성할 것이다.

  • Sftp1Scheduler.java
@Slf4j
@Component
@RequiredArgsConstructor
public class Sftp1Scheduler {

    // Spring Batch Bean
    private final JobLauncher jobLauncher;
    private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
    private final InterfaceFileReaderBatchJobConfig interfaceFileReaderBatchJobConfig;
    private final Sftp1Reader sftp1Reader;

    // Scheduler 설정 테이블 Jpa Repository
    private final SchedulerConfigRepository schedulerConfigRepository;

    // Scheduler 식별자
    private String schedulerIdx = "3";
    private String schedulerName = "Sftp1";

    // Scheduler 설정 값
    private String executionIp;
    private String executionYn;
    private String cron;

    // Scheduler가 실행시킬 task
    private ScheduledFuture<?> future;

    // 초기화
    @PostConstruct // AP가 실행되고 Spring 초기화 과정에서 메소드가 실행될 수 있도록 선언
    public void init() {
        try {
            if (!this.startScheduler()) {
                log.error();
            }
        }
        catch (Exception e) {
               log.error();
        }
    }

    // 스케줄러 시작
    public boolean startScheduler() throws Exception {
        if (this.future != null)
            return false;
        SchedulerConfig schedulerConfig = schedulerConfigRepository.findBySchedulerIdxAndSchedulerName(this.schedulerIdx, this.schedulerName);
        if (!validateExecution(schedulerConfig)) {
            return false;
        }
        this.executionIp = schedulerConfig.getExecutionIp();
        this.executionYn = schedulerConfig.getExecutionYn();

        this.cron = schedulerConfig.getCron();
        ScheduledFuture<?> future = this.threadPoolTaskScheduler.schedule(this.getRunnable(), this.getTrigger());
        this.future = future;
        return true;
    }

    // 스케줄러 종료
    public void stopScheduler() {
        if (this.future != null)
            this.future.cancel(true);
        this.future = null;
    }

    // 런타임 스케줄러 주기 변경
    public void changeCron(String cron) throws Exception {
        this.saveCron(cron);
        this.stopScheduler();
        this.cron = cron;
        this.startScheduler();
    }

    // 여기부터 클래스 내부에서만 쓸 메소드들
    private boolean validateExecution(SchedulerConfig schedulerConfig) throws UnknowsHostException {
        if (schedulerConfig == null) {
            return false;
        }
        String localIp = InetAddress.getLocalHost().getHostAddress();    // 현재 실행중인 서버 ip 조회

        if ("N".equals(schedulerConfig.getExecutionYn()) || !localIp.equals(schedulerConfig.getExecutionIp())) {
            return false;
        }
        return true;
    }

    // 스케줄러가 실행시킬 task
    private Runnable getRunnable() {
        return () -> {
            try {

                Step interfaceStep = interfaceFileReaderBatchJobConfig.interfaceStep(sftp1Reader.resultFlatFileItemReader);

                Map<String, JobParameter> confMap = new HashMap<>();
                confMap.put("time", new JobParameter(System.currentTimeMillis()));
                JobParameters jobParameters = new JobParameters(confMap);

                jobLauncher.run(interfaceFileReaderBatchJobConfig.interfaceJob(interfaceStep), jobParameters);

            }
            catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException | JobParametersInvalidException | JobRestartException e) {
                log.error();
            }
            catch (Exception e) {
                log.error();
            }
        }
    }

    // 스케줄러의 trigger
    private Trigger getTrigger() {
        return new CronTrigger(this.cron);
    }

    @Transactional
    private void saveCron(String cron) throws Exception {
        SchedulerConfig schedulerConfig = new SchedulerConfig();
        schedulerConfig.setSchedulerIdx(this.schedulerIdx);
        schedulerConfig.setSchedulerName(this.schedulerName);
        schedulerConfig.setCron(cron);
        schedulerConfig.setLastChgDt(new Date());
        schedulerConfig.setExecutionIp(this.executionIp);
        schedulerConfig.setExecutionYn(this.executionYn);
        schedulerConfigRepository.save(schedulerConfig);
    }

}

위와 같이 Scheduler 코드를 작성하여 런타임 환경에서 사용자 임의대로 스케줄러 설정을 변경할 수 있도록 하였다.
초기화 flow는 다음과 같다.

초기화 flow

  • AP 실행 및 Spring 초기화
    • init 메소드 실행
      • startScheduler 메소드 실행
        • 스케줄러 설정 테이블 값 조회
        • 조회 값 기반 validateExecution 메소드 실행
          • 서버ip, 실행여부 확인 및 결정
        • threadPoolTaskScheduler.schedule 실행
          • 다음 주기에 실행될 task 등록


일단 위 코드를 통해 임의 서버에서 실행여부에 맞게 Scheduler가 실행되도록 startScheduler, validateExecution 메소드를 작성하였다.
그리고 changeCron 및 stopScheduler 메소드를 작성하여 런타임환경에서 사용자의 요청에 따라 Scheduler를 제어할 수 있도록 준비하였다.

이제 실제로 사용자의 요청을 받기 위한 Controller 구현이 필요했다.

  • RequestSchedulerDto.java
@Data
public class RequestSchedulerDto {

    @JsonProperty("schedulerIdx")
    private String schedulerIdx;

    @JsonProperty("schedulerName")
    private String schedulerName;

    @JsonProperty("cron")
    private String cron;

}
  • ResponseSchedulerDto.java
@Data
public class ResponseSchedulerDto {

    @JsonProperty("rsltcd")
    private String rsltCd;

    @JsonProperty("errMsg")
    private String errMsg;

}
  • SchedulerController.java
@Controller
@RequiredArgsConstructor
@Slf4j
@RequestMapping("/scheduler")
public class SchedulerController {

    // 스케줄러 Bean 주입
    private final Sftp1Scheduler sftp1Scheduler;
    private final Sftp2Scheduler sftp2Scheduler;

    // sheduler 작업 시작
    @PostMapping("/start")
    @ResponseBody
    public ResponseEntity<?> requestStartScheduler(@RequestBody RequestSchedulerDto requestSchedulerDto) throws InterruptedException {
        ResponseSchedulerDto responseSchedulerDto = new ResponseSchedulerDto();
        String schedulerIdx = responseSchedulerDto.getSchedulerIdx();
        String schedulerName = responseSchedulerDto.getSchedulerName();
        boolean returnFlag = true;

        if (schedulerIdx == null || schedulerName == null) {
            responseSchedulerDto.setRsltCd("E1");
            responseSchedulerDto.setErrMsg("no request data");
        }

        try {
            if ("1".equals(schedulerIdx) && "InterfaceResult".equals(schedulerName)) {
                returnFlag = sftp1Scheduler.startScheduler();
            }
            else if ("2".equals(schedulerIdx) && "InterfaceResult2".equals(schedulerName)) {
                returnFlag = sftp2Scheduler.startScheduler();
            }
            else {
                log.error("no target scheduler");
                responseSchedulerDto.setRsltCd("E4");
                responseSchedulerDto.setErrMsg("no target scheduler");
                return new ResponseEntity<>(responseSchedulerDto, HttpStatus.OK);
            }

            if (returnFlag == false) {
                responseSchedulerDto.setRsltCd("E3");
                responseSchedulerDto.setErrMsg("already started scheduler or execution conditions not met");
            }
            responseSchedulerDto.setRsltCd("S");
            responseSchedulerDto.setErrMsg("");
        }
        catch (Exception e) {
            log.error(e.getMessage());
            responseSchedulerDto.setRsltCd("E2");
            responseSchedulerDto.setErrMsg("internal server error");
        }
        return new ResponseEntity<>(responseSchedulerDto, HttpStatus.OK);
    }

    // scheduler 작업 종료
    @PostMapping("/stop")
    @ResponseBody
    public ResponseEntity<?> requestStopScheduler(@RequestBody RequestSchedulerDto requestSchedulerDto) throws InterruptedException {
        ResponseSchedulerDto responseSchedulerDto = new ResponseSchedulerDto();
        String schedulerIdx = responseSchedulerDto.getSchedulerIdx();
        String schedulerName = responseSchedulerDto.getSchedulerName();

        if (schedulerIdx == null || schedulerName == null) {
            responseSchedulerDto.setRsltCd("E1");
            responseSchedulerDto.setErrMsg("no request data");
        }

        try {
            if ("1".equals(schedulerIdx) && "InterfaceResult".equals(schedulerName)) {
                returnFlag = sftp1Scheduler.stopScheduler();
            }
            else if ("2".equals(schedulerIdx) && "InterfaceResult2".equals(schedulerName)) {
                returnFlag = sftp2Scheduler.stopScheduler();
            }
            else {
                log.error("no target scheduler");
                responseSchedulerDto.setRsltCd("E4");
                responseSchedulerDto.setErrMsg("no target scheduler");
                return new ResponseEntity<>(responseSchedulerDto, HttpStatus.OK);
            }

            responseSchedulerDto.setRsltCd("S");
            responseSchedulerDto.setErrMsg("");
        }
        catch (Exception e) {
            log.error(e.getMessage());
            responseSchedulerDto.setRsltCd("E2");
            responseSchedulerDto.setErrMsg("internal server error");
        }
        return new ResponseEntity<>(responseSchedulerDto, HttpStatus.OK);
    }

    // cron식 변경
    @PostMapping("/changecron")
    @ResponseBody
    public ResponseEntity<?> requestChangeTrigger(@RequestBody RequestSchedulerDto requestSchedulerDto) throws InterruptedException {
        ResponseSchedulerDto responseSchedulerDto = new ResponseSchedulerDto();
        String schedulerIdx = responseSchedulerDto.getSchedulerIdx();
        String schedulerName = responseSchedulerDto.getSchedulerName();
        String cron = responseSchedulerDto.getCron();

        if (schedulerIdx == null || schedulerName == null || cron == null) {
            responseSchedulerDto.setRsltCd("E1");
            responseSchedulerDto.setErrMsg("no request data");
        }

        try {
            if ("1".equals(schedulerIdx) && "InterfaceResult".equals(schedulerName)) {
                returnFlag = sftp1Scheduler.changeCron();
            }
            else if ("2".equals(schedulerIdx) && "InterfaceResult2".equals(schedulerName)) {
                returnFlag = sftp2Scheduler.changeCron();
            }
            else {
                log.error("no target scheduler");
                responseSchedulerDto.setRsltCd("E4");
                responseSchedulerDto.setErrMsg("no target scheduler");
                return new ResponseEntity<>(responseSchedulerDto, HttpStatus.OK);
            }

            responseSchedulerDto.setRsltCd("S");
            responseSchedulerDto.setErrMsg("");
        }
        catch (Exception e) {
            log.error(e.getMessage());
            responseSchedulerDto.setRsltCd("E2");
            responseSchedulerDto.setErrMsg("internal server error");
        }
        return new ResponseEntity<>(responseSchedulerDto, HttpStatus.OK);
    }

}

위와 같이 코드를 작성하여 실제 사용자가 scheduler 작업을 controll 할 수 있게 하였다.

Controller는 아래와 같이 3개로 구성하였다.

  • requestStartScheduler
  • requestStopScheduler
  • requestChangeTrigger


Controller의 구성은 셋 다 비슷한데, 기본적인 flow는 아래와 같다.

  1. Rest API body 값을 읽어서 사용자의 요청 값을 받는다.
  2. 받은 요청 값을 통해 어떤 Scheduler를 실행시킬지 결정한다. (if-else)
  3. Scheduler가 결정되면, 요청에 맞는 Scheduler의 메소드를 실행시킨다.

개선할 점.

  1. Scheduler 클래스 -> 인터페이스 및 부모-자식 클래스로 변경
    : 각 Scheduler 클래스의 구성은 비슷하다. Interface 및 부모-자식 클래스로 리팩토링 하여
    코드의 양을 현저하게 줄이고, 코드의 유지보수성을 높일 수 있다. 그리고 이를 통해 Controller의 if-else문을 개선할 수 있을 것 같다.
  2. Controller if-else
    : 현행은 if-else문을 통해 Scheduler를 식별하는 방식이다. 위 1번 항목에서 리팩토링된 코드를 활용하면
    scheduler가 추가될 때마다 else if를 추가하지 않아도 작동될 수 있도록 개발할 수 있지않을까 생각된다. => 검토 필요

위에서 언급한 개선 점은 👉다음글에서 어느정도 보완하였다.

마무리

본 글을 마지막으로 File to DB 개발기를 끝낸다.

이외에도 비대면 고객케어 솔루션 프로젝트를 통해 개발한 부분이 있다. (Spring Security, SOAP연동)
해당 부분은 좀 더 정리가 되면 이어서 작성할 것이다.

File to DB 개발을 수행했던 3월 한달 간 너무나 재밌고 유익한 개발을 경험했다.
Spring Boot의 기본 동작원리 뿐만 아니라, Scheduler, Batch 등과 같은 Spring 모듈에 대해서도 알게되어 좋았다.

또, 개발하며 느낀건... 내가 미지의 부분을 개발하며 하나씩 알아가는 것에 재미를 느낀다는 것을 알게 되었다.
개발하며 결과물을 내는 것도 좋지만, 앞으로는 개발을 진행하며 궁금증을 하나씩 해소해 나가는 과정에 초점을 두어야겠다.
그리고 이를 통해 꾸준하게 성장해 나갈 것이다.

반응형
728x90
반응형

Spring

현재까지 File을 읽어서 DB에 저장하는 기본적인 Batch 로직을 구성하였다. 👉이전글 참고

동일 File 반복 Read 에러 발생

현상

Batch 동작을 통해 SFTP 기능 테스트를 진행하는 도중에 한 가지 오류를 발견했다.
아래처럼 의도한대로 기능이 동작하지 않는 것이다.

  • 의도
  1. 주기적으로 Batch Job이 실행되어 Read 대상의 File을 선정한다.
  2. Read할 파일의 파일명 끝에 _END가 붙어있으면 skip하고 다음 대상을 탐색한다.
  3. Read 대상파일 선정 후, Read 작업이 끝나면 해당 파일의 파일명 끝에 _END를 붙여준다.

파일명 변경 로직은 아직 개발되지 않아서 수동으로 파일명 변경하고 테스트를 반복하였다.

프로그램이 대상파일을 Read하면 해당 파일에 수동으로 _END를 붙여주고,
다음 Batch Job 동작 시, 다른 대상파일을 탐색하는지 확인하는 테스트를 진행하였다.
하지만, 프로그램은 다른 파일을 탐색하지 않고 기존에 Read했던 파일만 반복적으로 Read하였다.

같은 파일만 지속적으로 Read하는 것을 봤을 때, 배치 job이 Read할 File을 새롭게 갱신을 못하는 것 같았다.
배치 job 실행 때마다 Step Bean이 새로 생성될 수 있도록 초점을 맞추고 해결방안을 고민하였다.

@StepScope

@StepScope는 배치 job step의 tasklet이나 itemReader, itemWriter에 붙는 어노테이션이다.
해당 어노테이션이 붙은 Bean은 Spring 컨테이너 실행시점에 생성되는 것이 아닌, 해당 Step이 실행되는 시점에 생성된다.
(Spring Bean의 기본 Scope는 singleton이기 때문에 생성시점은 중요하다.)

  • 사용이유
    : Spring Batch의 Job은 기본적으로 실행될 때, JobParameters가 함께 입력되어 실행된다. 이 때, Job Instance를 구분하는 기준은 JobParameters이고, 중복 JobParameters의 Job은 생성되지 않는다.

하지만 배치 Job Bean 생성 시점은 Spring Container 생성 시점이기 때문에, Bean 생성작업은 한번만 실행된다. 그렇기 때문에 실행 Job은 JobParameters로 구분할 수 있겠지만, 동작은 항상 동일할 것이다.

이러한 Batch의 맹점을 보완한 것이 @StepScope@JobScope이고, JobParameters활용 시에는 해당 어노테이션 활용이 병행되어야 한다.

어떻게 보완했나?

나도 역시 위에서 언급한 Batch의 맹점을 발견하였다.
해당 맹점은 위에서 언급했다시피 배치 job동작 시, 동일 file만 반복적으로 Read하는 현상이다.

해당 현상은 배치 job의 초기화가 Spring Container 생성 시점에 진행되기 때문에,
초기화 시점에 선정된 대상 file만 반복적으로 Read하는 것이라고 판단했다. (=> 더 알아봐야할 듯.)

해당 현상을 개선하기 위해 아래 코드처럼 @StepScope어노테이션을 선언하였다.

  • ResultReader.java
@Configuration
@RequiredArgsConstructor
public class InterfaceReader {

    @Bean
    @StepScope // 어노테이션 추가
    public <T> FlatFileItemReader<T> resultFlatFileItemReader() throws IOException {

        String targetFile = null;
        List<String> findFileList = new ArrayList<String>();
        String rootDir = "/save/";
        String fileNamePrefix = "INTERFACE_";
        String fileFullname = rootDir + fileNamePrefix;

        FlatFileItemReader<T> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setEncoding("UTF-8");

        try {

            // FileReader 설정
            setFileReaderConfiguration(Result.class, new ResultFieldSetMapper(), flatFileItemReader);

            Path dirPath = Paths.get(rootDir);
            Stream<Path> walk = Files.walk(dirPath, 1);
            List<Path> fileList = walk.collect(Collectors.toList());

            // Read 대상 파일 찾기
            for (Path path : fileList) {
                int extensionIdx = path.toString().lastIndexOf(".");
                if (path.toString().startsWith(fileFullname) && extensionIdx != -1 && !path.toString().substring(0, extensionIdx).endsWith("END")) {
                    targetFile = path.toString();
                    findFileList.add(path.toString());
                }
            }

            flatFileItemReader.setResource(new FileSystemResource(targetFile));

        }
        catch (NoSuchFileException e) {
            System.out.println(e.toString());
        }
        catch (NullPointerException e) {
            System.out.println(e.toString());
        }
        catch (Exception e) {
            System.out.println(e.toString());
        }

        return flatFileItemReader;
    }

    private <T> void setFileReaderConfiguration(Class<T> c, FieldSetMapper<T> mapClass, FlatFileItemReader<T> flatFileItemReader) {

        ... (중략)

    }

}

@StepScope어노테이션을 위 코드처럼 선언하게 되면,
해당 Step이 실행될 때마다 Bean 생성 과정을 반복하게 되어 Read 대상파일을 갱신하게 된다.

리팩토링

File to DB 개발은 총 세가지 종류의 파일을 읽기 위해 시작되었다.
그리고 파일종류마다 각각의 Batch Job과 Step, Reader 등을 구성하여 기능을 구현하였다.

그러다보니 중복되는 부분이 많았다. ( ItemReader로직 동일 )
컨셉은 File을 Read해서 DB에 저장하는 것으로 동일하니, 어쩔 수 없는 부분이었고 공통로직을 모듈화시켜서 효율성을 증대시키고 싶었다.

ItemReader를 공통모듈 형태로 따로 빼고, 객체형태로 Parameter를 받아서 실행시키기 위해
아래와 같이 코드를 리팩토링 하였다.

  • application.yml
ENV_POSTGRESQL: 127.0.0.1
ENV_POSTGRESQL_PORT: 5432
ENV_POSTGRESQL_DB: batch
ENV_POSTGRESQL_SCHEMA: daewoong
POSTGRESQL_USERNAME: postgres
POSTGRESQL_PASSWORD: postgres

DIR_DELIMITER: /
SFTP_ROOT: /sftp/
SFTP_DEST1: /sftp/1/
SFTP_DEST2: /sftp/2/
SFTP_DEST3: /sftp/3/

server:
    port: 8080

spring:
    main:
        allow-bean-definition-overriding: true

    datasource:
        url: jdbc:postgresql://${ENV_POSTGRESQL}:${ENV_POSTGRESQL_PORT}/${ENV_POSTGRESQL_DB}
        username: ${POSTGRESQL_USERNAME}
        password: ${POSTGRESQL_PASSWORD}

    jpa:
        hibernate:
            ddl-auto: create
        database-platform: org.hibernate.dialect.PostgreSQLDialect
        properties:
            hibernate:
                format_sql: true
                show_sql: true

5개의 환경변수를 추가하였다. ( DIR_DELIMITER, SFTP_ROOT, SFTP_DEST1, SFTP_DEST2, SFTP_DEST3 )

  1. DIR_DELIMITER : 디렉토리 path 구분자, window와 linux의 형태가 다르기 때문에 따로 입력. 파일처리에 활용
  2. SFTP_ROOT : Read 대상의 파일들이 있는 path
  3. SFTP_DEST : Read 후, 파일들이 저장되어야할 path. 파일처리에 활용

  • ReaderRequestDto.java
@Component
@Data
public class ReaderRequestDto {

    private String rootDir;                // 파일 저장 경로
    private String destDir;                // END파일 저장 경로
    private String fileNamePrefix;        // 파일이름 접두사
    private String delimiter;            // 구분자 => 파일데이터 구분, 디렉토리 path 구분자와 혼동x
    private String[] columnNames;        // 컬럼명
    private String targetFile;            // 읽은 파일

}

위와 같이 공통 모듈의 파라미터로 활용할 객체를 선언하였다.

  1. fileNamePrefix : 각 종류의 파일은 접두사로 구분되기 때문에 해당 값을 활용하여 대상 파일을 탐색하였다.
  2. columnNames : FlatFileItemReader는 파일데이터를 컬럼명에 알맞게 매핑하기 위해 따로 컬럼명을 입력받아 설정해주어야 한다. 이를 위해 필요한 값이다.
  3. targetFile : job이 한번 실행될 때, read된 파일이다. 파일처리에 필요한 Listener에서 활용하기 위해 필요한 값이다.

  • CommonReader.java
@Component
@RequiredArgsConstructor
public class CommonReader {

    private final ReaderRequestDto readerRequestDto;

    public <T> FlatFileItemReader<T> commonFlatFileItemReader(Class<T> c, FieldSetMapper<T> mapClass throws IOException {

        String targetFile = null;
        List<String> findFileList = new ArrayList<String>();
        String rootDir = readerRequestDto.getRootDir();                    // parameter 입력
        String fileNamePrefix = readerRequestDto.getFileNamePrefix();    // parameter 입력
        String fileFullname = rootDir + fileNamePrefix;

        FlatFileItemReader<T> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setEncoding("UTF-8");

        try {

            // FileReader 설정
            setFileReaderConfiguration(Result.class, new ResultFieldSetMapper(), flatFileItemReader);

            Path dirPath = Paths.get(rootDir);
            Stream<Path> walk = Files.walk(dirPath, 1);
            List<Path> fileList = walk.collect(Collectors.toList());

            // Read 대상 파일 찾기
            for (Path path : fileList) {
                int extensionIdx = path.toString().lastIndexOf(".");
                if (path.toString().startsWith(fileFullname) && extensionIdx != -1 && !path.toString().substring(0, extensionIdx).endsWith("END")) {
                    targetFile = path.toString();
                    findFileList.add(path.toString());
                }
            }
            readerRequestDto.setTargetFile(targetFile);        // parameter 세팅

            flatFileItemReader.setResource(new FileSystemResource(targetFile));

        }
        catch (NoSuchFileException e) {
            System.out.println(e.toString());
        }
        catch (NullPointerException e) {
            System.out.println(e.toString());
        }
        catch (Exception e) {
            System.out.println(e.toString());
        }

        return flatFileItemReader;

    }

    private <T> void setFileReaderConfiguration(Class<T> c, FieldSetMapper<T> mapClass, FlatFileItemReader<T> flatFileItemReader) {

        // File 내 구분자 설정
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer(readerRequestDto.getDelimiter());    // paramter 입력
        // 구분자 기준, 저장될 컬럼 이름 및 순서 지정
        delimitedLineTokenizer.setNames("NAME", "DATE", "SEQ", "STRING");

        // Mapper 설정
        DefaultLineMapper<T> defaultLineMapper = new DefaultLineMapper<>();
        BeanWrapperFieldSetMapper<T> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<>();
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        if (mapClass != null) {
            defaultLineMapper.setFieldSetMapper(mapClass);
        }
        else {
            beanWrapperFieldSetMapper.setTargetType(c);
            defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
        }

        flatFileItemReader.setLineMapper(defaultLineMapper);

    }

}

기존과 다르게 위 코드에서 보이는 것처럼 객체 parameter로부터 대상파일을 탐색하기 위한 root와 fileName정보를 입력받고,
file read가 끝나면 대상 파일을 다시 객체 parameter에 세팅해주는 로직이 추가되었다.

parameter를 입력받는 모듈로 구성하기 위해 @Component어노테이션을 활용하여 ItemReader가 아닌 별도 모듈형태로 Bean을 생성하도록 하였다.

  • Sftp1Reader.java
@Configuration
@RequiredArgsConstructor
public class Sftp1Reader {

    private final CommonReader commonReader;
    private final ReaderRequestDto readerRequestDto;

    @Value("${SFTP_ROOT}")
    private String rootDir;

    @Value("${SFTP_DEST1}")
    private String destDir;

    @Bean
    @StepScope
    public FlatFileItemReader<Result> resultFlatFileItemReader() throws IOException {

        String[] columnNames = {"NAME", "DATE", "SEQ", "STRING"};
        readerRequestDto.setRootDir(rootDir);
        readerRequestDto.setDestDir(destDir);
        readerRequestDto.setFileNamePrefix("INTERFACE_");
        readerRequestDto.setDelimiter("|");
        readerRequestDto.setColumnNames(columnNames);

        return commonReader.commonFlatFileItemReader(Result.class, new ResultFieldSetMapper());
    }

}

위와 같이 Step을 재구성하여 공통 모듈인 commonReader에 parameter형태로 필요한 값이 입력될 수 있도록 리팩토링하였다.

그 결과, ItemReader 로직 수정이 필요할 시 공통로직 수정을 통해 빠르게 반영하여 생산성을 제고할 수 있었다.

Read 파일 처리기능 추가

앞서 언급한 것처럼 Read된 파일들은 뒤에 _END를 붙여야 하고, 각각의 디렉토리에 이동시켜 보관해야 한다.
해당 기능은 StepExecutionListener를 활용하여 구현하였다.

StepExecutionListner

Spring Batch는 배치 job의 동작을 보조하기 위한 여러 종류의 Listener를 제공한다.

  1. JobExecutionListener : Job 단계 전/후에 동작하는 Listener
  2. StepExecutionListener : Step 단계 전/후에 동작하는 Listener
  3. ChunkListener : Chunk 단계 전/후에 동작하는 Listener
  4. ItemReadListener : ItemReader 단계 전/후에 동작하는 Listener
  5. ItemProcessorListener : ItemProcessor 단계 전/후에 동작하는 Listener
  6. ItemWriteListener : ItemWriter 단계 전/후에 동작하는 Listener
  7. SkipListener : Skip이 발생한 경우 동작하는 Listener
  8. RetryListener : Retry 전/후에 동작하는 Listener

File 데이터를 읽고 DB에 성공적으로 저장되면 파일처리를 해야하기 때문에,
StepExecutionListener를 활용하였고 ItemWriter 동작이 끝나면 실행될 수 있도록 아래와 같이 코드를 수정하였다.
( CommonStepExecutionListener DI추가 및 interfaceStep 메소드 수정 )

  • InterfaceFileReaderBatchJobConfig.java
@Configuration
@RequiredArgsConstructor
public class InterfaceFileReaderBatchJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private final InterfaceProcessor interfaceProcessor;
    private final InterfaceWriter interfaceWriter;

    private final CommonStepExecutionListener commonStepExecutionListener; // DI 추가

    private static final int chunkSize = 1000;

    @Bean
    public Job interfaceJob(Step interfaceStep) throws IOException {
        return jobBuilderFactory.get("interfaceJob")
                .start(interfaceStep)
                .build();
    }

    @Bean
    public Step interfaceStep(FlatFileItemReader<Result> resultFlatFileItemReader) throws IOException {
        return stepBuilderFactory.get("interfaceStep")
                .<Result, Result>chunk(chunkSize)
                .reader(resultFlatFileItemReader)
                .processor(interfaceProcessor)
                .writer(interfaceWriter)
                .listener(commonStepExecutionListener) // Listener 추가
                .build();
    }
}

그리고 아래와 같이 실질적인 파일처리를 위한 Listener로직을 구성하였다.

  • CommonStepExecutionListener.java
@Component
@RequiredArgsConstructor
public class CommonStepExecutionListener implements StepExecutionListener {

    private final ReaderRequestDto readerRequestDto;

    @Value("${DIR_DELIMITER}")
    private String dirDelimiter;

    @Override
    public void beforeStep(StepExecution stepExecution) {

    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {

        if (readerRequestDto.getTargetFile() == null) {        // Reader 동작 시, 세팅된 targetFile 값
            return stepExecution.getExitStatus();
        }

        String totalSourceName = readerRequestDto.getTargetFile();    // 처리 대상의 파일명
        String destDir = readerRequestDto.getDestDir();                // 처리 후 이동될 경로
        int extensionIdx = totalSourceName.lastIndexOf(".");        // 확장자 바로 이전의 인덱스 값 확인
        String sourceName = totalSourceName.subString(0, extensionIdx);    // 확장자 제외 파일명
        int fileNameIdx = sourceName.lastIndexOf(dirDelimiter);
        String fileName = sourceName.subString(fileNameIdx + 1, sourceName.length());    // 경로데이터 제외 파일명
        String extension = totalSourceName.substring(extensionIdx, totalSourceName.length());    // 확장자

        String destName = destDir + fileName + "_END" + extension;        // 최종 파일명 (목적지 경로 포함)

        try {
            Path source = Paths.get(totalSourceName);
            Files.move(source, source.resolveSibling(destName));    // source -> dest 파일 이동
        }
        catch (Exception e) {

        }

        return stepExecution.getExitStatus();
    }

}

이어서

이로써 Spring Batch의 기능적인 부분은 모두 구현하였다.

하지만 예상치 못한 오류를 대비해야 했다.
서버문제로 인해 해당 프로그램이 잘 동작하지 않으면, File은 처리되지 않고 지속적으로 쌓이기만 할 것이다.
그렇기 때문에, File처리를 위한 Job을 실행시키는 Scheduler를 관리자 임의대로 runtime환경에서 설정할 수 있어야 했다.

이를 위해 Scheduler 기능을 추가 구현하였다.
Spring Batch 외의 내용이지만, 다음 글에서 추가로 다룰 것이다.

반응형
728x90
반응형

Spring

비대면 고객케어 솔루션 개발 도중 타 모듈로부터 연동데이터를 File로 받아야 하는 요구사항이 들어왔다.
하루 평균 2~3만건의 데이터가 연동되고 이후 데이터량이 증가될 것을 고려하여 Spring Batch를 활용하기로 했다.

구체적인 개발 내용은 타 모듈로부터 File형태로 데이터들을 연동받으면,
Spring Batch가 주기적으로 연동받은 File을 읽어서 PostgreSQL DBdp 저장하는 것이다.

개발을 위해 Spring Batch에 대해 공부해야 했다. 👉Spring Batch란?

초기 환경설정

  • 버전 정보
    JDK 1.8, Spring Boot 2.6.4

개발 시작 전, Batch 환경을 위한 설정을 추가하였다.
(아래 코드들은 개발 시 실제 작성했던 코드와 네이밍을 다르게 하여 작성했다.)

  • application.yml
ENV_POSTGRESQL: 127.0.0.1
ENV_POSTGRESQL_PORT: 5432
ENV_POSTGRESQL_DB: batch
ENV_POSTGRESQL_SCHEMA: daewoong
POSTGRESQL_USERNAME: postgres
POSTGRESQL_PASSWORD: postgres

server:
    port: 8080

spring:
    main:
        allow-bean-definition-overriding: true

    datasource:
        url: jdbc:postgresql://${ENV_POSTGRESQL}:${ENV_POSTGRESQL_PORT}/${ENV_POSTGRESQL_DB}
        username: ${POSTGRESQL_USERNAME}
        password: ${POSTGRESQL_PASSWORD}

    jpa:
        hibernate:
            ddl-auto: create
        database-platform: org.hibernate.dialect.PostgreSQLDialect
        properties:
            hibernate:
                format_sql: true
                show_sql: true

위 설정을 간략히 요약하면,

  1. DB 연결을 위한 설정, jdbc:postgresql://127.0.0.1:5432/batch, postgres/postgres
  2. AP서버 port : 8080
  3. allow-bean-definition-overriding: true >> 수동생성 빈과 자동생성 빈의 중복문제 방지
  4. ddl-auto: create >> AP서버 부팅 시, DB 초기화
  5. database-platform: org.hibernate.dialect.PostgreSQLDialect >> PostgreSQL 방언 설정
  6. log에 SQL형태로 SQL을 나타냄.

그리고 아래와 같이 main클래스에 @EnableBatchProcessing어노테이션을 붙이면 공통 Batch 설정이 마무리 된다.
(@EnableScheduling어노테이션은 Batch와 직접적인 연관은 없지만, Batch Job을 주기적으로 실행시키기 위해 선언되어야 한다.)

  • InterfaceApplication.java
@EnableBatchProcessing
@EnableScheduling
@SpringBootApplication
public class InterfaceApplication {

    public static void main(String[] args) {
        SpringApplication.run(InterfaceApplication.class, args);
    }

}

배치 job 구현

Spring Batch 구현방식은 Chunk-Oriented방식을 활용하였다.
전달받는 파일은 아래 format으로 구성되어 있다.
파일 상의 1개 row는 DB에 저장될 row이고, |문자를 구분자로 하여 4개 컬럼에 나눠 저장될 것이다. 컬럼 지정은 AP 코드 상에서 진행된다.

  • INTERFACE_202304182300.txt
AAA|2023-04-17|112233|test1
BBB|2023-04-17|123123|test2
CCC|2023-04-17|321321|test3
DDD|2023-04-18|332211|test4

이제 위 파일을 읽어서 DB에 저장할 배치job을 정의해야 한다.
아래는 스케줄러로 실행시킬 배치 job의 config이다.

  • InterfaceFileReaderBatchJobConfig.java
@Configuration
@RequiredArgsConstructor
public class InterfaceFileReaderBatchJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
      private final StepBuilderFactory stepBuilderFactory;

    private final InterfaceProcessor interfaceProcessor;
    private final InterfaceWriter interfaceWriter;

    private static final int chunkSize = 1000;

    @Bean
    public Job interfaceJob(Step interfaceStep) throws IOException {
        return jobBuilderFactory.get("interfaceJob")
                .start(interfaceStep)
                .build();
    }

    @Bean
    public Step interfaceStep(FlatFileItemReader<Result> resultFlatFileItemReader) throws IOException {
        return stepBuilderFactory.get("interfaceStep")
                .<Result, Result>chunk(chunkSize)
                .reader(resultFlatFileItemReader)
                .processor(interfaceProcessor)
                .writer(interfaceWriter)
                .build();
    }
}

위 config는 핵심 객체인 Job객체와 Job이 실행시킬 Step 객체를 생성하기 위한 설정이다.

Chunk-Oriented방식에서 Step은 데이터를 읽는 ItemReader, 데이터 처리 ItemProcessor, 데이터 쓰기 ItemWriter 3개 역할로 나눠서 수행된다.
chunkSize는 임의대로 1000으로 설정하였다. Result객체 단위로 파일에서 1000개 row씩 읽어서 데이터를 처리한다.
ItemReader의 대상은 File이기때문에 FlatFileItemReader객체를 활용하여 데이터를 파싱한다.

  • InterfaceReader.java
@Component
@RequiredArgsConstructor
public class InterfaceReader {

    public FlatFileItemReader<Result> resultFlatFileItemReader() throws IOException {

        String targetFile = null;
        List<String> findFileList = new ArrayList<String>();
        String rootDir = "/save/";
        String fileNamePrefix = "INTERFACE_";
        String fileFullname = rootDir + fileNamePrefix;

        FlatFileItemReader<Result> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setEncoding("UTF-8");

        try {

            // FileReader 설정
            setFileReaderConfiguration(Result.class, new ResultFieldSetMapper(), flatFileItemReader);

            Path dirPath = Paths.get(rootDir);
            Stream<Path> walk = Files.walk(dirPath, 1);
            List<Path> fileList = walk.collect(Collectors.toList());

            // Read 대상 파일 찾기
            for (Path path : fileList) {
                int extensionIdx = path.toString().lastIndexOf(".");
                if (path.toString().startsWith(fileFullname) && extensionIdx != -1 && !path.toString().substring(0, extensionIdx).endsWith("END")) {
                    targetFile = path.toString();
                    findFileList.add(path.toString());
                }
            }

            flatFileItemReader.setResource(new FileSystemResource(targetFile));

        }
        catch (NoSuchFileException e) {
            System.out.println(e.toString());
        }
        catch (NullPointerException e) {
            System.out.println(e.toString());
        }
        catch (Exception e) {
            System.out.println(e.toString());
        }

        return flatFileItemReader;
    }

    private <T> void setFileReaderConfiguration(Class<T> c, FieldSetMapper<T> mapClass, FlatFileItemReader<T> flatFileItemReader) {

        // File 내 구분자 설정
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer("|");
        // 구분자 기준, 저장될 컬럼 이름 및 순서 지정
        delimitedLineTokenizer.setNames("NAME", "DATE", "SEQ", "STRING");

        // Mapper 설정
        DefaultLineMapper<T> defaultLineMapper = new DefaultLineMapper<>();
        BeanWrapperFieldSetMapper<T> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<>();
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        if (mapClass != null) {
            defaultLineMapper.setFieldSetMapper(mapClass);
        }
        else {
            beanWrapperFieldSetMapper.setTargetType(c);
            defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
        }

        flatFileItemReader.setLineMapper(defaultLineMapper);

    }

}

위 코드를 보면 파일에서 읽은 데이터를 객체에 알맞게 매핑하여 넣어줄 용도로 ResultFieldSetMapper Mapper클래스를 활용한다.
해당 클래스는 커스터마이징하여 임의로 정의한 Mapper클래스이다.
default Mapper클래스가 아닌 custom Mapper클래스를 활용하는 이유는 데이터를 받을 클래스는 Entity클래스이고, 날짜 형식이나 Not null조건과 같이 Entity 필드 조건에 맞게 데이터가 입력되어야 하기 때문에 default Mapper를 활용하면 Mapping error가 발생한다.
해당 error를 방지하기 위해 날짜형식을 가공하여 데이터를 Mapping시켜주는 Custom Mapper클래스를 정의하여 활용하였다.

아래는 해당 Mapper클래스와 Entity객체이다.

  • ResultFieldSetMapper.java
public class ResultFieldSetMapper implements FieldSetMapper<Result> {

    public Result mapFieldSet(FieldSet fieldSet) {

        Result result = new Result();

        if (fieldSet == null) {
            return null;
        }

        // 읽은 데이터의 date 형식과 entity date형식의 차이로 인한 오류를 방지
        // 아래 정의된 형식대로 데이터가 들어오지 않아 Exception이 발생하여 데이터 저장에 오류를 발생하는 것을 방지하기 위해 catch문을 통한 null입력 구문 추가
        try {
            result.setDate(fieldSet.readDate("DATE", "yyyy-MM-dd"))
        } catch {
            result.setDate(null);
        }

        result.setName(fieldSet.readString("NAME"));
        result.setSeq(fieldSet.readString("SEQ"));
        result.setString(fieldSet.readString("STRING"));

    }

}
  • Result.java
@Table(name = "tb_result")
@Data
@NoArgsConstructor(access = AccessLevel.PUBLIC)
@Entity
@IdClass(ResultId.class)
public class Result implements Serializable {

    @Id
    @Column(name = "name", length = 20, nullable = false)
    private String name;

    @Id
    @Column(name = "seq", length = 10, nullable = false)
    private String seq;

    @Column(name = "date")
    @DateTimeFormat(pattern = "yyyy-MM-dd")
    private Date date;

    @Column(name = "string", length = 20)
    private String string;

}
  • ResultId.java
public class ResultId implements Serializable {

    @Column(name = "name", length = 20, nullable = false)
    private String name;

    @Column(name = "seq", length = 10, nullable = false)
    private String seq;

}

복수 개의 ID 컬럼이 있을 경우, 위와 같이 Serializable인터페이스를 상속 받아 ID클래스를 별도로 정의하여 설정한다.

위 Job을 통해 데이터를 읽고, 아래 ResultProcessorResultWriter 객체를 통해 데이터를 처리하여 DB에 저장한다.

  • ResultProcessor.java
@Configuration
public class ResultProcessor implements ItemProcessor<Result, Result> {

    public Result process(Result result) throws Exception {
        // Result Entity 객체로 읽은 데이터의 추가 처리가 필요할 시, 해당 메소드에 처리 로직을 구성하면 된다.
        return result;
    }

}
  • ResultWriter.java
@Configuration
@RequiredArgsConstructor
public class ResultWriter implements ItemWriter<Result> {

    private final ResultRepository resultRepository;

    @Override
    public void write(List<? extends Result> list) throws Exception {

        resultRepository.saveAll(new ArrayList<Result>(list));

    }

}
  • ResultRepository.java
public interface ResultRepository extends JpaRepository<Result, Long> {

}

이제 Job 정의는 다 끝났다.

위와 같이 정의된 Job을 주기적으로 실행하기 위해 Scheduler를 정의한다.

정의된 job InterfaceFileReaderBatchJobConfig을 주입받고
주입받은 Job을 JobLauncher를 활용하여 실행하는데, Job 실행단위인 JobInstance 간의 차이를 두기 위해
JobParameters를 JobLauncher에 job과 같이 parameter로 넘겨준다.

그리고 해당 JobLauncher를 Scheduler로 실행하는 형태로 구성하여
최종적으로 일정주기마다 배치 job이 실행되는 형태를 구성할 수 있다.

  • JobScheduler.java
@Component
@RequiredArgsConstructor
public class JobScheduler {

    private final JobLauncher jobLauncher;

    private final InterfaceFileReaderBatchJobConfig interfaceFileReaderBatchJobConfig;

    private final InterfaceReader interfaceReader;

    // 스케줄러 주기 설정
    @Scheduled(cron = "0 * * * * *")
    public void runJob() throws IOException {

        // JobParameters를 실시간 시간으로 설정
        Map<String, JobParameter> confMap = new HashMap<>();
        confMap.put("time", new JobParameter(System.currentTimeMillis()));
        JobParameters jobParameters = new JobParameters(confMap);

        try {

            // 배치 job 실행
            jobLauncher.run(interfaceFileReaderBatchJobConfig.interfaceJob(
                interfaceFileReaderBatchJobConfig.interfaceStep(interfaceReader)), jobParameters
            );

        } catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException | JobParametersInvalidException | JobRestarException e) {
            System.out.println(e.getMessage())
        }

    }

}

이어서

지금까지 File to DB 기능을 구현하기 위한 기본적인 개발 과정을 거쳤다.
하지만 완벽한 기능 구현까지의 몇 차례의 시행착오가 있었고,
Batch의 추가적인 기능이 필요했다.

다음 글에서는 앞서 겪었던 시행착오와 Batch 추가기능을 통해 File 처리하는 개발과정을 기록할 것이다.

반응형
728x90
반응형


비대면 고객케어 개발 프로젝트를 진행하면서 spring batch를 활용할 일이 생겼다.
연동을 위해 필요한데, 타 시스템에서 파일형태로 대용량의 데이터를 넘겨주면
주기적으로 넘겨받은 파일을 읽어서 DB에 저장하는 연동이다.

연동 Flow는 간단하지만, 일 20만건 이상의 대용량의 데이터를 다루기 때문에 batch를 고려하게 되었다.

1. Spring Batch란?

Spring Batch는 Spring환경에서 대용량의 데이터 처리를 위한 기능을 제공하는 프레임워크이다.
Batch의 사전적 의미도 일괄처리인 것처럼 Spring환경에서 대용량의 데이터 처리를 위해 사용된다.

대용량의 데이터 처리를 위한 기능으로는 로깅/추적, 트랜잭션 관리, 작업 처리 통계, 작업 재시작, 건너뛰기, 리소스 관리 등이 있다.

Batch에 대한 오해

"Spring Batch는 스케줄러가 아니다."
Spring Batch를 제대로 접하기 전까지 나는 Batch가 특정 작업을 일정 주기마다 반복적으로 실행시키기 위한 프레임워크로 알고 있었다. 하지만 Batch와 스케줄러는 별개의 기능이다.

Batch는 데이터를 대용량으로 일괄처리하기 위한 Job이라는 형태의 객체를 제공할 뿐이고, 해당 Job을 제공받아 실행하는 스케줄러는 별도로 정의해주어야 한다.

스케줄러의 형태는 @Scheduled어노테이션을 붙이는 가장 기본적인 형태부터 Quartz까지 다양하게 정의될 수 있다.
본 글에서는 기본적 형태의 스케줄러를 적용하여 Batch를 알아볼 것이다.

2. Batch 아키텍처

Batch는 기본적으로 아래 사진처럼 구성되어 있다. 하나씩 살펴보자.

Job Scheduler

: 정의된 배치Job을 일정 주기마다 실행시켜주는 Scheduler 객체이다. 위에서 언급했던 것처럼 Batch와는 직접적인 연관이 없다.

Job Launcher

: 실제로 배치Job을 실행시켜주는 객체이다. Scheduler와 함께 사용한다면, Scheduler 객체 안에서 Job Launcher를 활용하여 배치Job을 실행시켜주는 방식으로 사용된다. Job Instance를 구분하기 위한 Job Parameter를 함께 입력할 수 있다.

Job

: 실제 데이터를 Batch로 처리하기 위한 작업 단위이다. 비유한다면, Job Launcher는 작업자이고 Job은 작업자에게 할당된 작업의 내용이라고 보면 된다.

Step

: 정의된 Job을 효율적이고 안정적으로 처리하기 위해 작업을 나눈 단위이다. Job을 요리에 비유한다면, 요리라는 Job은 재료준비-재료손질-조리라는 과정대로 진행될 것이고, 재료준비와 같은 각 순서들이 Step이라고 볼 수 있다.

Job Repository

: 정의된 Job의 처리를 위한 메타데이터가 저장되어있는 저장소이다. 해당 메타데이터는 위에서 언급한 Batch의 대용량 처리를 위한 기능(로깅/추적, 작업재시작, 건너뛰기 등)에 활용된다. 위에서 비유한 것처럼 Job이 요리라면, 나의 요리일지(?)나 미리 손질된 재료들의 정보들이 메타데이터이고 Job Repository에 기록된다고 보면 된다.

tasklet 방식

tasklet은 Spring Batch의 Step을 구현하는 기본 방식이다.

chunk 방식

chunk방식은 chunk 단위 만큼 Transaction을 수행하게 할 수 있는 방식이다.
step을 정의할 때, chunk의 수를 정할 수 있다. step이 한번 수행될 때마다 chunk 수만큼 transaction이 실행되므로
step 실행 도중 실패시, rollback도 chunk수만큼 수행된다.

chunk방식은 step이 아래 3개 모듈로 나눠서 실행된다.

ItemReader

: 데이터를 읽는 모듈이다. File을 읽을 수도 있고, DB를 읽을 수도 있다.

ItemProcessor

: 데이터를 처리하는 모듈이다. 데이터를 저장하기 전에 가공이 필요할 경우, 해당 모듈에서 진행하게 된다.
해당 모듈은 option이다.

ItemWriter

: 데이터를 저장하는 모듈이다. DB에 저장하거나 File로 남길 수 있다.

3. 예제

  • Job Scheduler
    : JobLauncher를 활용하여 주기적으로 job을 실행시켜주는 객체
@Configuration
@RequiredArgsConstructor
public class JobScheduler {

    private final JobLauncher jobLauncher;
    private final ExampleJobConfig exampleJobConfig ;

    @Scheduled(cron = "1 * * * * *")
    public void jobScheduled() throws JobParametersInvalidException, JobExecutionAlreadyRunningException,
            JobRestartException, JobInstanceAlreadyCompleteException {

            // make job parameter
            Map<String, JobParameter> jobParametersMap = new HashMap<>();
            SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
            Date time = new Date();
            String time1 = format1.format(time);
            jobParametersMap.put("date",new JobParameter(time1));
            JobParameters parameters = new JobParameters(jobParametersMap);

            // job start
            JobExecution jobExecution = jobLauncher.run(exampleJobConfig.exampleJob, parameters);

    }

}
  • Job (tasklet 방식)
    : 실제 실행되는 job 정의
@Configuration
@RequiredArgsConstructor
public class ExampleJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job exampleJob() {
        return jobBuilderFactory.get("exampleJob")
                .start(exampleStep())
                .build();
    }

    @Bean
    public Step exampleStep() {
        return stepBuilderFactory.get("exampleStep")
                .tasklet(new ExampleTasklet())
                .build();
    }

}
  • tasklet
@Slf4j
public class ExampleTasklet implements Tasklet, StepExecutionListener {

    @Override
    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        log.info("Before Step");
    }

    @Override
    @AfterStep
    public ExitStatus afterStep(StepExecution stepExecution) {

        log.info("After Step");

        return ExitStatus.COMPLETED;
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

        //비즈니스 로직
        log.info("Business Logic");

        return RepeatStatus.FINISHED;
    }

}
  • Job (chunk 방식)
    : 실제 실행되는 job 정의, Reader, Processor, Writer를 각각 정의해주어야 한다. (외부 클래스로 빼도 됨)
@Configuration
@RequiredArgsConstructor
public class ExampleJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job exampleJob() {
        return jobBuilderFactory.get("exampleJob")
                .start(exampleStep())
                .build();
    }

    @Bean
    public Step exampleStep() {
        return stepBuilderFactory.get("exampleStep")
                .<Member,Member>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Member> reader() throws Exception {

        Map<String,Object> parameterValues = new HashMap<>();
        parameterValues.put("amount", "10000");

        //pageSize와 fethSize는 동일하게 설정
        return new JdbcPagingItemReaderBuilder<Member>()
                .pageSize(10)
                .fetchSize(10)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Member.class))
                .queryProvider(customQueryProvider())
                .parameterValues(parameterValues)
                .name("JdbcPagingItemReader")
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Member, Member> processor(){

        return new ItemProcessor<Member, Member>() {
            @Override
            public Member process(Member member) throws Exception {

                //1000원 추가 적립
                member.setAmount(member.getAmount() + 1000);

                return member;
            }
        };
    }

    @Bean
    @StepScope
    public JdbcBatchItemWriter<Member> writer(){
        return new JdbcBatchItemWriterBuilder<Member>()
                .dataSource(dataSource)
                .sql("UPDATE MEMBER SET AMOUNT = :amount WHERE ID = :id")
                .beanMapped()
                .build();

    }

    public PagingQueryProvider customQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();

        queryProviderFactoryBean.setDataSource(dataSource);

        queryProviderFactoryBean.setSelectClause("SELECT ID, NAME, EMAIL, NICK_NAME, STATUS, AMOUNT ");
        queryProviderFactoryBean.setFromClause("FROM MEMBER ");
        queryProviderFactoryBean.setWhereClause("WHERE AMOUNT >= :amount");

        Map<String,Order> sortKey = new HashMap<>();
        sortKey.put("id", Order.ASCENDING);

        queryProviderFactoryBean.setSortKeys(sortKey);

        return queryProviderFactoryBean.getObject();

    }

}

이어서

다음은 실제 진행했던 개발 내용과 겪었던 시행착오에 대해 다룰 것이다.

반응형

+ Recent posts