Framework/Spring-Batch

(7) Spring Batch Chunk

조훙 2022. 11. 16. 16:51

Chunk란?

  • Chunk란 여러 개의 아이템을 묶은 단위 또는 블럭
  • Spring Batch에서 데이터를 다루는 단위
  • ItemReader / ItemProcesser / ItemWriter 사이의 데이터를 전달하는 Item의 묶음.
  • 대용량 데이터를 한번에 처리 하는게 아닌, Chunk 단위로 사용하여 Commit, Rollback 등을 활용

Sample Code

@RequiredArgsConstructor
@Configuration
public class ChunkConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<String, String>chunk(2)
                .reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3","item4", "item5", "item6")))
                .processor(new ItemProcessor<String, String>() {
                    @Override
                    public String process(String item) throws Exception {
                        Thread.sleep(300);
                        System.out.println(item);
                        return "my_" + item;
                    }
                })
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        Thread.sleep(1000);
                        System.out.println(items);
                    }
                })
                .build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2 has executed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}

ChunkOrientedTasklet

Chunk 를 실제로 처리 하는 Tasklet은 ChunkOrientedTasklet 이라고 한다.

Tasklet의 구현체로써 TaskletStep에 의해 반복적으로 실행되며, ChunkOrientedTasklet이 호출 될때마다 Transaction이 생성된다.

StepBuilderFactory 예시

public Step chunkStep() {
        return stepBuilderFactory.get(“chunkStep")
 	.<I, O>chunk(10)                                     // Chunk Size 설정. Commit Interval 설정
	.<I, O>chunk(CompletionPolicy)                       // Chunk 프로세스 완료하기 위한 정책 설정 클래스
    .reader(itemReader())                                  // ItemReader 구현체 지정
    .writer(itemWriter())                                  // ItemWriter 구현체 지정
    .processor(itemProcessor())                            // ItemProcessor 구현체
    .stream(ItemStream())                                  // 재시작 데이터를 관리하는 콜백에 대한 스트림 
    .readerIsTransactionalQueue()                          // Item이 JMS와 같은 트랜잭션 외부에서 읽혀지는지에 대한 설정. Default는 False
    .listener(ChunkListener)                               // Chunk 프로세스가 진행되는 특정 시점에 콜백 제공받도록 하는 Listener 등록
    .build();
}

ChunkProvider

  • ItemReader 를 사용해서 소스로부터 아이템을 Chunk size 만큼 읽어서 Chunk 단위로 만들어 제공하는 도메인 객체
  • Chunk<I> 생성 -> ItemReader.read() 를 계속 호출하면서 item 을 Chunk에 입력
  • 외부로 부터 ChunkProvider 가 호출될 때마다 항상 새로운 Chunk 가 생성

구현체

  • SimpleChunkProvider
  • FaultTolerantChunkProvider

ChunkProcessor

  • ItemProcessor 를 사용해서 Item 을 변형, 가공, 필터링하고 ItemWriter 를 사용해서 Chunk 데이터를 저장, 출력한다
  • Chunk<O> 를 만들고 앞에서 넘어온 Chunk<I> 의 item 을 한 건씩 처리한 후 Chunk<O> 에 저장
  • 외부로 부터 ChunkProcessor 가 호출될 때마다 항상 새로운 Chunk 가 생성

Reference