본문 바로가기
Framework/Spring-Batch

(9) Spring Batch ItemProcessor

by 조훙 2022. 11. 16.

ItemProcessor

  • Job이 처리하는 Item을 Business Logic에 적용
  • 사용자가 Business Logic을 개발하고, 필요에 맞게 로직을 적용 할 수 있는 Interface
  • ItemReader를 통해 입력 받은 데이터의 유효성 검사를 할 수도 있음
  • 스크립트를 활용하여 데이터 유효성 검사도 가능
  • ItemProcessor<I,O> 형태로 제공
    • I: ItemReader에서 생성된 Item 형태
    • O: ItemWriter에서 사용할 Item 형태
  • null 을 반환할 경우 모든 작업이 중지 된다.

Annotation을 활용한 Validation

Annotation 예시

public class Customer {

	@NotNull(message="First name is required")
	@Pattern(regexp="[a-zA-Z]+", message="First name must be alphabetical")
	private String firstName;

	@Size(min=1, max=1)
	@Pattern(regexp="[a-zA-Z]", message="Middle initial must be alphabetical")
	private String middleInitial;

	@NotNull(message="Last name is required")
	@Pattern(regexp="[a-zA-Z]+", message="Last name must be alphabetical")
	private String lastName;

	@NotNull(message="Address is required")
	@Pattern(regexp="[0-9a-zA-Z\\. ]+")
	private String address;

	@NotNull(message="City is required")
	@Pattern(regexp="[a-zA-Z\\. ]+")
	private String city;

	@NotNull(message="State is required")
	@Size(min=2,max=2)
	@Pattern(regexp="[A-Z]{2}")
	private String state;

	@NotNull(message="Zip is required")
	@Size(min=5,max=5)
	@Pattern(regexp="\\d{5}")
	private String zip;

	public Customer() {
	}

	public Customer(Customer original) {
		this.firstName = original.getFirstName();
		this.middleInitial = original.getMiddleInitial();
		this.lastName = original.getLastName();
		this.address = original.getAddress();
		this.city = original.getCity();
		this.state = original.getState();
		this.zip = original.getZip();
	}

	public String getFirstName() {
		return firstName;
	}

	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}

	public String getMiddleInitial() {
		return middleInitial;
	}

	public void setMiddleInitial(String middleInitial) {
		this.middleInitial = middleInitial;
	}

	public String getLastName() {
		return lastName;
	}

	public void setLastName(String lastName) {
		this.lastName = lastName;
	}

	public String getAddress() {
		return address;
	}

	public void setAddress(String address) {
		this.address = address;
	}

	public String getCity() {
		return city;
	}

	public void setCity(String city) {
		this.city = city;
	}

	public String getState() {
		return state;
	}

	public void setState(String state) {
		this.state = state;
	}

	public String getZip() {
		return zip;
	}

	public void setZip(String zip) {
		this.zip = zip;
	}

	@Override
	public String toString() {
		return "Customer{" +
				"firstName='" + firstName + '\'' +
				", middleInitial='" + middleInitial + '\'' +
				", lastName='" + lastName + '\'' +
				", address='" + address + '\'' +
				", city='" + city + '\'' +
				", state='" + state + '\'' +
				", zip='" + zip + '\'' +
				'}';
	}
}

BeanValidatingItemProcessor 활용 예시

public class ValidationJob {


	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Bean
	@StepScope
	public FlatFileItemReader<Customer> customerItemReader(
			@Value("#{jobParameters['customerFile']}")Resource inputFile) {

		return new FlatFileItemReaderBuilder<Customer>()
				.name("customerItemReader")
				.delimited()
				.names(new String[] {"firstName",
						"middleInitial",
						"lastName",
						"address",
						"city",
						"state",
						"zip"})
				.targetType(Customer.class)
				.resource(inputFile)
				.build();
	}

	@Bean
	public ItemWriter<Customer> itemWriter() {
		return (items) -> items.forEach(System.out::println);
	}

	@Bean
	public BeanValidatingItemProcessor<Customer> customerValidatingItemProcessor() {
		return new BeanValidatingItemProcessor<>();
	}

	@Bean
	public Step copyFileStep() {

		return this.stepBuilderFactory.get("copyFileStep")
				.<Customer, Customer>chunk(5)
				.reader(customerItemReader(null))
				.processor(customerValidatingItemProcessor())
				.writer(itemWriter())
				.build();
	}

	@Bean
	public Job job() throws Exception {

		return this.jobBuilderFactory.get("job")
				.start(copyFileStep())
				.build();
	}

	public static void main(String[] args) {
		SpringApplication.run(ValidationJob.class, "customerFile=/input/customer.csv");
	}
}

Custom Validation 기능 예시

/**
* UniqueLastNameValidator.java
*/
public class UniqueLastNameValidator extends ItemStreamSupport implements Validator<Customer> {

	private Set<String> lastNames = new HashSet<>();

	@Override
	public void validate(Customer value) throws ValidationException {
		if(lastNames.contains(value.getLastName())) {
			throw new ValidationException("Duplicate last name was found: " + value.getLastName());
		}

		this.lastNames.add(value.getLastName());
	}

	@Override
	public void update(ExecutionContext executionContext) {
		executionContext.put(getExecutionContextKey("lastNames"), this.lastNames);
	}

	@Override
	public void open(ExecutionContext executionContext) {
		String lastNames = getExecutionContextKey("lastNames");

		if(executionContext.containsKey(lastNames)) {
			this.lastNames = (Set<String>) executionContext.get(lastNames);
		}
	}
}

/**
* ValidationJob.java
*/
public class ValidationJob {


	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Bean
	@StepScope
	public FlatFileItemReader<Customer> customerItemReader(
			@Value("#{jobParameters['customerFile']}")Resource inputFile) {

		return new FlatFileItemReaderBuilder<Customer>()
				.name("customerItemReader")
				.delimited()
				.names(new String[] {"firstName",
						"middleInitial",
						"lastName",
						"address",
						"city",
						"state",
						"zip"})
				.targetType(Customer.class)
				.resource(inputFile)
				.build();
	}

	@Bean
	public ItemWriter<Customer> itemWriter() {
		return (items) -> items.forEach(System.out::println);
	}

	@Bean
	public UniqueLastNameValidator validator() {
		UniqueLastNameValidator uniqueLastNameValidator = new UniqueLastNameValidator();

		uniqueLastNameValidator.setName("validator");

		return uniqueLastNameValidator;
	}

	@Bean
	public ValidatingItemProcessor<Customer> customerValidatingItemProcessor() {
		return new ValidatingItemProcessor<>(validator());
	}

	@Bean
	public BeanValidatingItemProcessor<Customer> customerValidatingItemProcessor() {
		return new BeanValidatingItemProcessor<>();
	}

	@Bean
	public Step copyFileStep() {

		return this.stepBuilderFactory.get("copyFileStep")
				.<Customer, Customer>chunk(5)
				.reader(customerItemReader(null))
				.processor(customerValidatingItemProcessor())
				.writer(itemWriter())
				.stream(validator())
				.build();
	}

	@Bean
	public Job job() throws Exception {

		return this.jobBuilderFactory.get("job")
				.start(copyFileStep())
				.build();
	}

	public static void main(String[] args) {
		SpringApplication.run(ValidationJob.class, "customerFile=/input/customer.csv");
	}
}

ItemProcessorAdaptor 예시

  • 만약 UpperCaseNameService.upperCase(Customer customer) 라는 로직이 이미 있을 경우
@Bean
public Step step() {
  return this.stepBuilderFactory.get("step")
      .<Customer, Customer>chunk(5)
      .reader(...)
      .processor(itemProcessor(null))
      .writer(...)
      .build();
}

@Bean
public ItemProcessorAdapter<Customer, Customer> itemProcessor(UpperCaseNameService service) {
  ItemProcessorAdapter<Customer, Customer> adapter = new ItemProcessorAdapter<>();
  adapter.getTargetObject(service);
  adapter.getTargetMethod("upperCase");
  return adapter;
}

ScriptItemProcessor 예시

  • 이름을 대문자로 만드는 Javascript 로직 수행
@Bean
@StepScope
public ScriptItemProcessor<Customer, Customer> itemProcessor(
  @Value("#{jobParameters['script']}") Resource script
) {
  ScriptItemProcessor<Customer, Customer> itemProcessor = new ScriptItemProcessor<>();
  itemProcessor.setScript(script);
  return itemProcessor;
}

CompositeItemProcessor 예시

  • 여러개의 ItemProcessor를 Chain 형태로 수행 해야 할 때 사용
  • 사용 조건:
    • 이전 프로세서가 반환한 타입과 다음 프로세서의 입력 타입이 같아야 한다
    • 한 프로세서가 null을 반환하면 해당 아이템은 더이상 처리되지 않는다
@Bean
public CompositeItemProcessor<Customer, Customer> itemProcessor() {
  CompositeItemProcessor<Customer, Customer> itemProcessor = new CompositeItemProcessor();
  itemProcessor.setDelegates(Arrays.asList(
    itemProcessor1(), itemProcessor2(), itemProcessor3()
  ));
  return itemProcessor;
}

ClassifierCompositeItemProcessor 예시

  • 각각의 ItemReader를 통해 넘어온 item 데이터에 따라 다른 ItemProcessor를 활용 하고 싶을 때 사용
  • ex) 우편번호가 홀수인지, 짝수인지 따라 전달되는 itemProcessor 에 따라 처리
@AllArgsConstructor
public class ZipCodeClassifier implements Classifier<Customer, ItemProcessor<Customer, Customer>> {

  private ItemProcessor<Customer, Customer> oddItemProcessor;
  private ItemProcessor<Customer, Customer> evenItemProcessor;

  @Override
  public ItemProcessor<Customer, Customer> classify(Customer classifiable) {
    if (Integer.parseInt(classifiable.getZipCode()) % 2 == 0) {
      return evenItemProcessor;
    } else {
      return oddItemProcessor;
    }
  }
}

@Bean
public ClassifierCompositeItemProcessor<Customer, Customer> itemProcessor() {
  ClassifierCompositeItemProcessor<Customer, Customer> itemProcessor 
    = new ClassifierCompositeItemProcessor<>();
  itemProcessor.setClassifier(classifier());
  return itemProcessor;
}

@Bean
public Classifier classifier() {
  return new ZipCodeClassifier(upperCaseItemProcessor(null), lowerCaseItemProcessor(null));
}

CustomItemProcessor 예시

  • ItemProcessor Interface 구현체를 직접 생성
  • return 값이 null 일 경우 해당 item을 Filter 처리(SKIP)
 public class EvenFilteringItemProcessor implements ItemProcessor<Customer, Customer> {
  @Override
  public Customer process(Customer item) throws Exception {
    return Integer.parseInt(item.getZipCode()) % 2 == 0 ? null : item;
  }
}
  • ItemProcessor 의 처리 결과를 조회하여 확인이 가능. (FILTER_COUNT도 확인 가능)
SELECT STEP_EXECUTION_ID as ID, STEP_NAME, STATUS, COMMIT_COUNT, READ_COUNT, FILTER_COUNT, WRITE_COUNT
from BATCH_STEP_EXECUTION;

Reference

'Framework > Spring-Batch' 카테고리의 다른 글

(10-1) Spring Batch FlatFileItemWriter  (0) 2022.11.16
(10) Spring Batch ItemWriter  (0) 2022.11.16
(8-1) Spring Batch FlatFileItemReader  (0) 2022.11.16
(8) Spring Batch ItemReader  (0) 2022.11.16
(7) Spring Batch Chunk  (0) 2022.11.16