728x90
반응형

Spring

비대면 고객케어 솔루션 개발 도중 타 모듈로부터 연동데이터를 File로 받아야 하는 요구사항이 들어왔다.
하루 평균 2~3만건의 데이터가 연동되고 이후 데이터량이 증가될 것을 고려하여 Spring Batch를 활용하기로 했다.

구체적인 개발 내용은 타 모듈로부터 File형태로 데이터들을 연동받으면,
Spring Batch가 주기적으로 연동받은 File을 읽어서 PostgreSQL DBdp 저장하는 것이다.

개발을 위해 Spring Batch에 대해 공부해야 했다. 👉Spring Batch란?

초기 환경설정

  • 버전 정보
    JDK 1.8, Spring Boot 2.6.4

개발 시작 전, Batch 환경을 위한 설정을 추가하였다.
(아래 코드들은 개발 시 실제 작성했던 코드와 네이밍을 다르게 하여 작성했다.)

  • application.yml
ENV_POSTGRESQL: 127.0.0.1
ENV_POSTGRESQL_PORT: 5432
ENV_POSTGRESQL_DB: batch
ENV_POSTGRESQL_SCHEMA: daewoong
POSTGRESQL_USERNAME: postgres
POSTGRESQL_PASSWORD: postgres

server:
    port: 8080

spring:
    main:
        allow-bean-definition-overriding: true

    datasource:
        url: jdbc:postgresql://${ENV_POSTGRESQL}:${ENV_POSTGRESQL_PORT}/${ENV_POSTGRESQL_DB}
        username: ${POSTGRESQL_USERNAME}
        password: ${POSTGRESQL_PASSWORD}

    jpa:
        hibernate:
            ddl-auto: create
        database-platform: org.hibernate.dialect.PostgreSQLDialect
        properties:
            hibernate:
                format_sql: true
                show_sql: true

위 설정을 간략히 요약하면,

  1. DB 연결을 위한 설정, jdbc:postgresql://127.0.0.1:5432/batch, postgres/postgres
  2. AP서버 port : 8080
  3. allow-bean-definition-overriding: true >> 수동생성 빈과 자동생성 빈의 중복문제 방지
  4. ddl-auto: create >> AP서버 부팅 시, DB 초기화
  5. database-platform: org.hibernate.dialect.PostgreSQLDialect >> PostgreSQL 방언 설정
  6. log에 SQL형태로 SQL을 나타냄.

그리고 아래와 같이 main클래스에 @EnableBatchProcessing어노테이션을 붙이면 공통 Batch 설정이 마무리 된다.
(@EnableScheduling어노테이션은 Batch와 직접적인 연관은 없지만, Batch Job을 주기적으로 실행시키기 위해 선언되어야 한다.)

  • InterfaceApplication.java
@EnableBatchProcessing
@EnableScheduling
@SpringBootApplication
public class InterfaceApplication {

    public static void main(String[] args) {
        SpringApplication.run(InterfaceApplication.class, args);
    }

}

배치 job 구현

Spring Batch 구현방식은 Chunk-Oriented방식을 활용하였다.
전달받는 파일은 아래 format으로 구성되어 있다.
파일 상의 1개 row는 DB에 저장될 row이고, |문자를 구분자로 하여 4개 컬럼에 나눠 저장될 것이다. 컬럼 지정은 AP 코드 상에서 진행된다.

  • INTERFACE_202304182300.txt
AAA|2023-04-17|112233|test1
BBB|2023-04-17|123123|test2
CCC|2023-04-17|321321|test3
DDD|2023-04-18|332211|test4

이제 위 파일을 읽어서 DB에 저장할 배치job을 정의해야 한다.
아래는 스케줄러로 실행시킬 배치 job의 config이다.

  • InterfaceFileReaderBatchJobConfig.java
@Configuration
@RequiredArgsConstructor
public class InterfaceFileReaderBatchJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
      private final StepBuilderFactory stepBuilderFactory;

    private final InterfaceProcessor interfaceProcessor;
    private final InterfaceWriter interfaceWriter;

    private static final int chunkSize = 1000;

    @Bean
    public Job interfaceJob(Step interfaceStep) throws IOException {
        return jobBuilderFactory.get("interfaceJob")
                .start(interfaceStep)
                .build();
    }

    @Bean
    public Step interfaceStep(FlatFileItemReader<Result> resultFlatFileItemReader) throws IOException {
        return stepBuilderFactory.get("interfaceStep")
                .<Result, Result>chunk(chunkSize)
                .reader(resultFlatFileItemReader)
                .processor(interfaceProcessor)
                .writer(interfaceWriter)
                .build();
    }
}

위 config는 핵심 객체인 Job객체와 Job이 실행시킬 Step 객체를 생성하기 위한 설정이다.

Chunk-Oriented방식에서 Step은 데이터를 읽는 ItemReader, 데이터 처리 ItemProcessor, 데이터 쓰기 ItemWriter 3개 역할로 나눠서 수행된다.
chunkSize는 임의대로 1000으로 설정하였다. Result객체 단위로 파일에서 1000개 row씩 읽어서 데이터를 처리한다.
ItemReader의 대상은 File이기때문에 FlatFileItemReader객체를 활용하여 데이터를 파싱한다.

  • InterfaceReader.java
@Component
@RequiredArgsConstructor
public class InterfaceReader {

    public FlatFileItemReader<Result> resultFlatFileItemReader() throws IOException {

        String targetFile = null;
        List<String> findFileList = new ArrayList<String>();
        String rootDir = "/save/";
        String fileNamePrefix = "INTERFACE_";
        String fileFullname = rootDir + fileNamePrefix;

        FlatFileItemReader<Result> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setEncoding("UTF-8");

        try {

            // FileReader 설정
            setFileReaderConfiguration(Result.class, new ResultFieldSetMapper(), flatFileItemReader);

            Path dirPath = Paths.get(rootDir);
            Stream<Path> walk = Files.walk(dirPath, 1);
            List<Path> fileList = walk.collect(Collectors.toList());

            // Read 대상 파일 찾기
            for (Path path : fileList) {
                int extensionIdx = path.toString().lastIndexOf(".");
                if (path.toString().startsWith(fileFullname) && extensionIdx != -1 && !path.toString().substring(0, extensionIdx).endsWith("END")) {
                    targetFile = path.toString();
                    findFileList.add(path.toString());
                }
            }

            flatFileItemReader.setResource(new FileSystemResource(targetFile));

        }
        catch (NoSuchFileException e) {
            System.out.println(e.toString());
        }
        catch (NullPointerException e) {
            System.out.println(e.toString());
        }
        catch (Exception e) {
            System.out.println(e.toString());
        }

        return flatFileItemReader;
    }

    private <T> void setFileReaderConfiguration(Class<T> c, FieldSetMapper<T> mapClass, FlatFileItemReader<T> flatFileItemReader) {

        // File 내 구분자 설정
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer("|");
        // 구분자 기준, 저장될 컬럼 이름 및 순서 지정
        delimitedLineTokenizer.setNames("NAME", "DATE", "SEQ", "STRING");

        // Mapper 설정
        DefaultLineMapper<T> defaultLineMapper = new DefaultLineMapper<>();
        BeanWrapperFieldSetMapper<T> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<>();
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        if (mapClass != null) {
            defaultLineMapper.setFieldSetMapper(mapClass);
        }
        else {
            beanWrapperFieldSetMapper.setTargetType(c);
            defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
        }

        flatFileItemReader.setLineMapper(defaultLineMapper);

    }

}

위 코드를 보면 파일에서 읽은 데이터를 객체에 알맞게 매핑하여 넣어줄 용도로 ResultFieldSetMapper Mapper클래스를 활용한다.
해당 클래스는 커스터마이징하여 임의로 정의한 Mapper클래스이다.
default Mapper클래스가 아닌 custom Mapper클래스를 활용하는 이유는 데이터를 받을 클래스는 Entity클래스이고, 날짜 형식이나 Not null조건과 같이 Entity 필드 조건에 맞게 데이터가 입력되어야 하기 때문에 default Mapper를 활용하면 Mapping error가 발생한다.
해당 error를 방지하기 위해 날짜형식을 가공하여 데이터를 Mapping시켜주는 Custom Mapper클래스를 정의하여 활용하였다.

아래는 해당 Mapper클래스와 Entity객체이다.

  • ResultFieldSetMapper.java
public class ResultFieldSetMapper implements FieldSetMapper<Result> {

    public Result mapFieldSet(FieldSet fieldSet) {

        Result result = new Result();

        if (fieldSet == null) {
            return null;
        }

        // 읽은 데이터의 date 형식과 entity date형식의 차이로 인한 오류를 방지
        // 아래 정의된 형식대로 데이터가 들어오지 않아 Exception이 발생하여 데이터 저장에 오류를 발생하는 것을 방지하기 위해 catch문을 통한 null입력 구문 추가
        try {
            result.setDate(fieldSet.readDate("DATE", "yyyy-MM-dd"))
        } catch {
            result.setDate(null);
        }

        result.setName(fieldSet.readString("NAME"));
        result.setSeq(fieldSet.readString("SEQ"));
        result.setString(fieldSet.readString("STRING"));

    }

}
  • Result.java
@Table(name = "tb_result")
@Data
@NoArgsConstructor(access = AccessLevel.PUBLIC)
@Entity
@IdClass(ResultId.class)
public class Result implements Serializable {

    @Id
    @Column(name = "name", length = 20, nullable = false)
    private String name;

    @Id
    @Column(name = "seq", length = 10, nullable = false)
    private String seq;

    @Column(name = "date")
    @DateTimeFormat(pattern = "yyyy-MM-dd")
    private Date date;

    @Column(name = "string", length = 20)
    private String string;

}
  • ResultId.java
public class ResultId implements Serializable {

    @Column(name = "name", length = 20, nullable = false)
    private String name;

    @Column(name = "seq", length = 10, nullable = false)
    private String seq;

}

복수 개의 ID 컬럼이 있을 경우, 위와 같이 Serializable인터페이스를 상속 받아 ID클래스를 별도로 정의하여 설정한다.

위 Job을 통해 데이터를 읽고, 아래 ResultProcessorResultWriter 객체를 통해 데이터를 처리하여 DB에 저장한다.

  • ResultProcessor.java
@Configuration
public class ResultProcessor implements ItemProcessor<Result, Result> {

    public Result process(Result result) throws Exception {
        // Result Entity 객체로 읽은 데이터의 추가 처리가 필요할 시, 해당 메소드에 처리 로직을 구성하면 된다.
        return result;
    }

}
  • ResultWriter.java
@Configuration
@RequiredArgsConstructor
public class ResultWriter implements ItemWriter<Result> {

    private final ResultRepository resultRepository;

    @Override
    public void write(List<? extends Result> list) throws Exception {

        resultRepository.saveAll(new ArrayList<Result>(list));

    }

}
  • ResultRepository.java
public interface ResultRepository extends JpaRepository<Result, Long> {

}

이제 Job 정의는 다 끝났다.

위와 같이 정의된 Job을 주기적으로 실행하기 위해 Scheduler를 정의한다.

정의된 job InterfaceFileReaderBatchJobConfig을 주입받고
주입받은 Job을 JobLauncher를 활용하여 실행하는데, Job 실행단위인 JobInstance 간의 차이를 두기 위해
JobParameters를 JobLauncher에 job과 같이 parameter로 넘겨준다.

그리고 해당 JobLauncher를 Scheduler로 실행하는 형태로 구성하여
최종적으로 일정주기마다 배치 job이 실행되는 형태를 구성할 수 있다.

  • JobScheduler.java
@Component
@RequiredArgsConstructor
public class JobScheduler {

    private final JobLauncher jobLauncher;

    private final InterfaceFileReaderBatchJobConfig interfaceFileReaderBatchJobConfig;

    private final InterfaceReader interfaceReader;

    // 스케줄러 주기 설정
    @Scheduled(cron = "0 * * * * *")
    public void runJob() throws IOException {

        // JobParameters를 실시간 시간으로 설정
        Map<String, JobParameter> confMap = new HashMap<>();
        confMap.put("time", new JobParameter(System.currentTimeMillis()));
        JobParameters jobParameters = new JobParameters(confMap);

        try {

            // 배치 job 실행
            jobLauncher.run(interfaceFileReaderBatchJobConfig.interfaceJob(
                interfaceFileReaderBatchJobConfig.interfaceStep(interfaceReader)), jobParameters
            );

        } catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException | JobParametersInvalidException | JobRestarException e) {
            System.out.println(e.getMessage())
        }

    }

}

이어서

지금까지 File to DB 기능을 구현하기 위한 기본적인 개발 과정을 거쳤다.
하지만 완벽한 기능 구현까지의 몇 차례의 시행착오가 있었고,
Batch의 추가적인 기능이 필요했다.

다음 글에서는 앞서 겪었던 시행착오와 Batch 추가기능을 통해 File 처리하는 개발과정을 기록할 것이다.

반응형

+ Recent posts