728x90
반응형

Spring

현재까지 File을 읽어서 DB에 저장하는 기본적인 Batch 로직을 구성하였다. 👉이전글 참고

동일 File 반복 Read 에러 발생

현상

Batch 동작을 통해 SFTP 기능 테스트를 진행하는 도중에 한 가지 오류를 발견했다.
아래처럼 의도한대로 기능이 동작하지 않는 것이다.

  • 의도
  1. 주기적으로 Batch Job이 실행되어 Read 대상의 File을 선정한다.
  2. Read할 파일의 파일명 끝에 _END가 붙어있으면 skip하고 다음 대상을 탐색한다.
  3. Read 대상파일 선정 후, Read 작업이 끝나면 해당 파일의 파일명 끝에 _END를 붙여준다.

파일명 변경 로직은 아직 개발되지 않아서 수동으로 파일명 변경하고 테스트를 반복하였다.

프로그램이 대상파일을 Read하면 해당 파일에 수동으로 _END를 붙여주고,
다음 Batch Job 동작 시, 다른 대상파일을 탐색하는지 확인하는 테스트를 진행하였다.
하지만, 프로그램은 다른 파일을 탐색하지 않고 기존에 Read했던 파일만 반복적으로 Read하였다.

같은 파일만 지속적으로 Read하는 것을 봤을 때, 배치 job이 Read할 File을 새롭게 갱신을 못하는 것 같았다.
배치 job 실행 때마다 Step Bean이 새로 생성될 수 있도록 초점을 맞추고 해결방안을 고민하였다.

@StepScope

@StepScope는 배치 job step의 tasklet이나 itemReader, itemWriter에 붙는 어노테이션이다.
해당 어노테이션이 붙은 Bean은 Spring 컨테이너 실행시점에 생성되는 것이 아닌, 해당 Step이 실행되는 시점에 생성된다.
(Spring Bean의 기본 Scope는 singleton이기 때문에 생성시점은 중요하다.)

  • 사용이유
    : Spring Batch의 Job은 기본적으로 실행될 때, JobParameters가 함께 입력되어 실행된다. 이 때, Job Instance를 구분하는 기준은 JobParameters이고, 중복 JobParameters의 Job은 생성되지 않는다.

하지만 배치 Job Bean 생성 시점은 Spring Container 생성 시점이기 때문에, Bean 생성작업은 한번만 실행된다. 그렇기 때문에 실행 Job은 JobParameters로 구분할 수 있겠지만, 동작은 항상 동일할 것이다.

이러한 Batch의 맹점을 보완한 것이 @StepScope@JobScope이고, JobParameters활용 시에는 해당 어노테이션 활용이 병행되어야 한다.

어떻게 보완했나?

나도 역시 위에서 언급한 Batch의 맹점을 발견하였다.
해당 맹점은 위에서 언급했다시피 배치 job동작 시, 동일 file만 반복적으로 Read하는 현상이다.

해당 현상은 배치 job의 초기화가 Spring Container 생성 시점에 진행되기 때문에,
초기화 시점에 선정된 대상 file만 반복적으로 Read하는 것이라고 판단했다. (=> 더 알아봐야할 듯.)

해당 현상을 개선하기 위해 아래 코드처럼 @StepScope어노테이션을 선언하였다.

  • ResultReader.java
@Configuration
@RequiredArgsConstructor
public class InterfaceReader {

    @Bean
    @StepScope // 어노테이션 추가
    public <T> FlatFileItemReader<T> resultFlatFileItemReader() throws IOException {

        String targetFile = null;
        List<String> findFileList = new ArrayList<String>();
        String rootDir = "/save/";
        String fileNamePrefix = "INTERFACE_";
        String fileFullname = rootDir + fileNamePrefix;

        FlatFileItemReader<T> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setEncoding("UTF-8");

        try {

            // FileReader 설정
            setFileReaderConfiguration(Result.class, new ResultFieldSetMapper(), flatFileItemReader);

            Path dirPath = Paths.get(rootDir);
            Stream<Path> walk = Files.walk(dirPath, 1);
            List<Path> fileList = walk.collect(Collectors.toList());

            // Read 대상 파일 찾기
            for (Path path : fileList) {
                int extensionIdx = path.toString().lastIndexOf(".");
                if (path.toString().startsWith(fileFullname) && extensionIdx != -1 && !path.toString().substring(0, extensionIdx).endsWith("END")) {
                    targetFile = path.toString();
                    findFileList.add(path.toString());
                }
            }

            flatFileItemReader.setResource(new FileSystemResource(targetFile));

        }
        catch (NoSuchFileException e) {
            System.out.println(e.toString());
        }
        catch (NullPointerException e) {
            System.out.println(e.toString());
        }
        catch (Exception e) {
            System.out.println(e.toString());
        }

        return flatFileItemReader;
    }

    private <T> void setFileReaderConfiguration(Class<T> c, FieldSetMapper<T> mapClass, FlatFileItemReader<T> flatFileItemReader) {

        ... (중략)

    }

}

@StepScope어노테이션을 위 코드처럼 선언하게 되면,
해당 Step이 실행될 때마다 Bean 생성 과정을 반복하게 되어 Read 대상파일을 갱신하게 된다.

리팩토링

File to DB 개발은 총 세가지 종류의 파일을 읽기 위해 시작되었다.
그리고 파일종류마다 각각의 Batch Job과 Step, Reader 등을 구성하여 기능을 구현하였다.

그러다보니 중복되는 부분이 많았다. ( ItemReader로직 동일 )
컨셉은 File을 Read해서 DB에 저장하는 것으로 동일하니, 어쩔 수 없는 부분이었고 공통로직을 모듈화시켜서 효율성을 증대시키고 싶었다.

ItemReader를 공통모듈 형태로 따로 빼고, 객체형태로 Parameter를 받아서 실행시키기 위해
아래와 같이 코드를 리팩토링 하였다.

  • application.yml
ENV_POSTGRESQL: 127.0.0.1
ENV_POSTGRESQL_PORT: 5432
ENV_POSTGRESQL_DB: batch
ENV_POSTGRESQL_SCHEMA: daewoong
POSTGRESQL_USERNAME: postgres
POSTGRESQL_PASSWORD: postgres

DIR_DELIMITER: /
SFTP_ROOT: /sftp/
SFTP_DEST1: /sftp/1/
SFTP_DEST2: /sftp/2/
SFTP_DEST3: /sftp/3/

server:
    port: 8080

spring:
    main:
        allow-bean-definition-overriding: true

    datasource:
        url: jdbc:postgresql://${ENV_POSTGRESQL}:${ENV_POSTGRESQL_PORT}/${ENV_POSTGRESQL_DB}
        username: ${POSTGRESQL_USERNAME}
        password: ${POSTGRESQL_PASSWORD}

    jpa:
        hibernate:
            ddl-auto: create
        database-platform: org.hibernate.dialect.PostgreSQLDialect
        properties:
            hibernate:
                format_sql: true
                show_sql: true

5개의 환경변수를 추가하였다. ( DIR_DELIMITER, SFTP_ROOT, SFTP_DEST1, SFTP_DEST2, SFTP_DEST3 )

  1. DIR_DELIMITER : 디렉토리 path 구분자, window와 linux의 형태가 다르기 때문에 따로 입력. 파일처리에 활용
  2. SFTP_ROOT : Read 대상의 파일들이 있는 path
  3. SFTP_DEST : Read 후, 파일들이 저장되어야할 path. 파일처리에 활용

  • ReaderRequestDto.java
@Component
@Data
public class ReaderRequestDto {

    private String rootDir;                // 파일 저장 경로
    private String destDir;                // END파일 저장 경로
    private String fileNamePrefix;        // 파일이름 접두사
    private String delimiter;            // 구분자 => 파일데이터 구분, 디렉토리 path 구분자와 혼동x
    private String[] columnNames;        // 컬럼명
    private String targetFile;            // 읽은 파일

}

위와 같이 공통 모듈의 파라미터로 활용할 객체를 선언하였다.

  1. fileNamePrefix : 각 종류의 파일은 접두사로 구분되기 때문에 해당 값을 활용하여 대상 파일을 탐색하였다.
  2. columnNames : FlatFileItemReader는 파일데이터를 컬럼명에 알맞게 매핑하기 위해 따로 컬럼명을 입력받아 설정해주어야 한다. 이를 위해 필요한 값이다.
  3. targetFile : job이 한번 실행될 때, read된 파일이다. 파일처리에 필요한 Listener에서 활용하기 위해 필요한 값이다.

  • CommonReader.java
@Component
@RequiredArgsConstructor
public class CommonReader {

    private final ReaderRequestDto readerRequestDto;

    public <T> FlatFileItemReader<T> commonFlatFileItemReader(Class<T> c, FieldSetMapper<T> mapClass throws IOException {

        String targetFile = null;
        List<String> findFileList = new ArrayList<String>();
        String rootDir = readerRequestDto.getRootDir();                    // parameter 입력
        String fileNamePrefix = readerRequestDto.getFileNamePrefix();    // parameter 입력
        String fileFullname = rootDir + fileNamePrefix;

        FlatFileItemReader<T> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setEncoding("UTF-8");

        try {

            // FileReader 설정
            setFileReaderConfiguration(Result.class, new ResultFieldSetMapper(), flatFileItemReader);

            Path dirPath = Paths.get(rootDir);
            Stream<Path> walk = Files.walk(dirPath, 1);
            List<Path> fileList = walk.collect(Collectors.toList());

            // Read 대상 파일 찾기
            for (Path path : fileList) {
                int extensionIdx = path.toString().lastIndexOf(".");
                if (path.toString().startsWith(fileFullname) && extensionIdx != -1 && !path.toString().substring(0, extensionIdx).endsWith("END")) {
                    targetFile = path.toString();
                    findFileList.add(path.toString());
                }
            }
            readerRequestDto.setTargetFile(targetFile);        // parameter 세팅

            flatFileItemReader.setResource(new FileSystemResource(targetFile));

        }
        catch (NoSuchFileException e) {
            System.out.println(e.toString());
        }
        catch (NullPointerException e) {
            System.out.println(e.toString());
        }
        catch (Exception e) {
            System.out.println(e.toString());
        }

        return flatFileItemReader;

    }

    private <T> void setFileReaderConfiguration(Class<T> c, FieldSetMapper<T> mapClass, FlatFileItemReader<T> flatFileItemReader) {

        // File 내 구분자 설정
        DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer(readerRequestDto.getDelimiter());    // paramter 입력
        // 구분자 기준, 저장될 컬럼 이름 및 순서 지정
        delimitedLineTokenizer.setNames("NAME", "DATE", "SEQ", "STRING");

        // Mapper 설정
        DefaultLineMapper<T> defaultLineMapper = new DefaultLineMapper<>();
        BeanWrapperFieldSetMapper<T> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<>();
        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        if (mapClass != null) {
            defaultLineMapper.setFieldSetMapper(mapClass);
        }
        else {
            beanWrapperFieldSetMapper.setTargetType(c);
            defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
        }

        flatFileItemReader.setLineMapper(defaultLineMapper);

    }

}

기존과 다르게 위 코드에서 보이는 것처럼 객체 parameter로부터 대상파일을 탐색하기 위한 root와 fileName정보를 입력받고,
file read가 끝나면 대상 파일을 다시 객체 parameter에 세팅해주는 로직이 추가되었다.

parameter를 입력받는 모듈로 구성하기 위해 @Component어노테이션을 활용하여 ItemReader가 아닌 별도 모듈형태로 Bean을 생성하도록 하였다.

  • Sftp1Reader.java
@Configuration
@RequiredArgsConstructor
public class Sftp1Reader {

    private final CommonReader commonReader;
    private final ReaderRequestDto readerRequestDto;

    @Value("${SFTP_ROOT}")
    private String rootDir;

    @Value("${SFTP_DEST1}")
    private String destDir;

    @Bean
    @StepScope
    public FlatFileItemReader<Result> resultFlatFileItemReader() throws IOException {

        String[] columnNames = {"NAME", "DATE", "SEQ", "STRING"};
        readerRequestDto.setRootDir(rootDir);
        readerRequestDto.setDestDir(destDir);
        readerRequestDto.setFileNamePrefix("INTERFACE_");
        readerRequestDto.setDelimiter("|");
        readerRequestDto.setColumnNames(columnNames);

        return commonReader.commonFlatFileItemReader(Result.class, new ResultFieldSetMapper());
    }

}

위와 같이 Step을 재구성하여 공통 모듈인 commonReader에 parameter형태로 필요한 값이 입력될 수 있도록 리팩토링하였다.

그 결과, ItemReader 로직 수정이 필요할 시 공통로직 수정을 통해 빠르게 반영하여 생산성을 제고할 수 있었다.

Read 파일 처리기능 추가

앞서 언급한 것처럼 Read된 파일들은 뒤에 _END를 붙여야 하고, 각각의 디렉토리에 이동시켜 보관해야 한다.
해당 기능은 StepExecutionListener를 활용하여 구현하였다.

StepExecutionListner

Spring Batch는 배치 job의 동작을 보조하기 위한 여러 종류의 Listener를 제공한다.

  1. JobExecutionListener : Job 단계 전/후에 동작하는 Listener
  2. StepExecutionListener : Step 단계 전/후에 동작하는 Listener
  3. ChunkListener : Chunk 단계 전/후에 동작하는 Listener
  4. ItemReadListener : ItemReader 단계 전/후에 동작하는 Listener
  5. ItemProcessorListener : ItemProcessor 단계 전/후에 동작하는 Listener
  6. ItemWriteListener : ItemWriter 단계 전/후에 동작하는 Listener
  7. SkipListener : Skip이 발생한 경우 동작하는 Listener
  8. RetryListener : Retry 전/후에 동작하는 Listener

File 데이터를 읽고 DB에 성공적으로 저장되면 파일처리를 해야하기 때문에,
StepExecutionListener를 활용하였고 ItemWriter 동작이 끝나면 실행될 수 있도록 아래와 같이 코드를 수정하였다.
( CommonStepExecutionListener DI추가 및 interfaceStep 메소드 수정 )

  • InterfaceFileReaderBatchJobConfig.java
@Configuration
@RequiredArgsConstructor
public class InterfaceFileReaderBatchJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private final InterfaceProcessor interfaceProcessor;
    private final InterfaceWriter interfaceWriter;

    private final CommonStepExecutionListener commonStepExecutionListener; // DI 추가

    private static final int chunkSize = 1000;

    @Bean
    public Job interfaceJob(Step interfaceStep) throws IOException {
        return jobBuilderFactory.get("interfaceJob")
                .start(interfaceStep)
                .build();
    }

    @Bean
    public Step interfaceStep(FlatFileItemReader<Result> resultFlatFileItemReader) throws IOException {
        return stepBuilderFactory.get("interfaceStep")
                .<Result, Result>chunk(chunkSize)
                .reader(resultFlatFileItemReader)
                .processor(interfaceProcessor)
                .writer(interfaceWriter)
                .listener(commonStepExecutionListener) // Listener 추가
                .build();
    }
}

그리고 아래와 같이 실질적인 파일처리를 위한 Listener로직을 구성하였다.

  • CommonStepExecutionListener.java
@Component
@RequiredArgsConstructor
public class CommonStepExecutionListener implements StepExecutionListener {

    private final ReaderRequestDto readerRequestDto;

    @Value("${DIR_DELIMITER}")
    private String dirDelimiter;

    @Override
    public void beforeStep(StepExecution stepExecution) {

    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {

        if (readerRequestDto.getTargetFile() == null) {        // Reader 동작 시, 세팅된 targetFile 값
            return stepExecution.getExitStatus();
        }

        String totalSourceName = readerRequestDto.getTargetFile();    // 처리 대상의 파일명
        String destDir = readerRequestDto.getDestDir();                // 처리 후 이동될 경로
        int extensionIdx = totalSourceName.lastIndexOf(".");        // 확장자 바로 이전의 인덱스 값 확인
        String sourceName = totalSourceName.subString(0, extensionIdx);    // 확장자 제외 파일명
        int fileNameIdx = sourceName.lastIndexOf(dirDelimiter);
        String fileName = sourceName.subString(fileNameIdx + 1, sourceName.length());    // 경로데이터 제외 파일명
        String extension = totalSourceName.substring(extensionIdx, totalSourceName.length());    // 확장자

        String destName = destDir + fileName + "_END" + extension;        // 최종 파일명 (목적지 경로 포함)

        try {
            Path source = Paths.get(totalSourceName);
            Files.move(source, source.resolveSibling(destName));    // source -> dest 파일 이동
        }
        catch (Exception e) {

        }

        return stepExecution.getExitStatus();
    }

}

이어서

이로써 Spring Batch의 기능적인 부분은 모두 구현하였다.

하지만 예상치 못한 오류를 대비해야 했다.
서버문제로 인해 해당 프로그램이 잘 동작하지 않으면, File은 처리되지 않고 지속적으로 쌓이기만 할 것이다.
그렇기 때문에, File처리를 위한 Job을 실행시키는 Scheduler를 관리자 임의대로 runtime환경에서 설정할 수 있어야 했다.

이를 위해 Scheduler 기능을 추가 구현하였다.
Spring Batch 외의 내용이지만, 다음 글에서 추가로 다룰 것이다.

반응형

+ Recent posts