ItemProcessor
- Job이 처리하는 Item을 Business Logic에 적용
- 사용자가 Business Logic을 개발하고, 필요에 맞게 로직을 적용 할 수 있는 Interface
- ItemReader를 통해 입력 받은 데이터의 유효성 검사를 할 수도 있음
- 스크립트를 활용하여 데이터 유효성 검사도 가능
- ItemProcessor<I,O> 형태로 제공
- I: ItemReader에서 생성된 Item 형태
- O: ItemWriter에서 사용할 Item 형태
- null 을 반환할 경우 모든 작업이 중지 된다.
Annotation을 활용한 Validation
Annotation 예시
public class Customer {
@NotNull(message="First name is required")
@Pattern(regexp="[a-zA-Z]+", message="First name must be alphabetical")
private String firstName;
@Size(min=1, max=1)
@Pattern(regexp="[a-zA-Z]", message="Middle initial must be alphabetical")
private String middleInitial;
@NotNull(message="Last name is required")
@Pattern(regexp="[a-zA-Z]+", message="Last name must be alphabetical")
private String lastName;
@NotNull(message="Address is required")
@Pattern(regexp="[0-9a-zA-Z\\. ]+")
private String address;
@NotNull(message="City is required")
@Pattern(regexp="[a-zA-Z\\. ]+")
private String city;
@NotNull(message="State is required")
@Size(min=2,max=2)
@Pattern(regexp="[A-Z]{2}")
private String state;
@NotNull(message="Zip is required")
@Size(min=5,max=5)
@Pattern(regexp="\\d{5}")
private String zip;
public Customer() {
}
public Customer(Customer original) {
this.firstName = original.getFirstName();
this.middleInitial = original.getMiddleInitial();
this.lastName = original.getLastName();
this.address = original.getAddress();
this.city = original.getCity();
this.state = original.getState();
this.zip = original.getZip();
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getMiddleInitial() {
return middleInitial;
}
public void setMiddleInitial(String middleInitial) {
this.middleInitial = middleInitial;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getZip() {
return zip;
}
public void setZip(String zip) {
this.zip = zip;
}
@Override
public String toString() {
return "Customer{" +
"firstName='" + firstName + '\'' +
", middleInitial='" + middleInitial + '\'' +
", lastName='" + lastName + '\'' +
", address='" + address + '\'' +
", city='" + city + '\'' +
", state='" + state + '\'' +
", zip='" + zip + '\'' +
'}';
}
}
BeanValidatingItemProcessor 활용 예시
public class ValidationJob {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
@StepScope
public FlatFileItemReader<Customer> customerItemReader(
@Value("#{jobParameters['customerFile']}")Resource inputFile) {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerItemReader")
.delimited()
.names(new String[] {"firstName",
"middleInitial",
"lastName",
"address",
"city",
"state",
"zip"})
.targetType(Customer.class)
.resource(inputFile)
.build();
}
@Bean
public ItemWriter<Customer> itemWriter() {
return (items) -> items.forEach(System.out::println);
}
@Bean
public BeanValidatingItemProcessor<Customer> customerValidatingItemProcessor() {
return new BeanValidatingItemProcessor<>();
}
@Bean
public Step copyFileStep() {
return this.stepBuilderFactory.get("copyFileStep")
.<Customer, Customer>chunk(5)
.reader(customerItemReader(null))
.processor(customerValidatingItemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return this.jobBuilderFactory.get("job")
.start(copyFileStep())
.build();
}
public static void main(String[] args) {
SpringApplication.run(ValidationJob.class, "customerFile=/input/customer.csv");
}
}
Custom Validation 기능 예시
/**
* UniqueLastNameValidator.java
*/
public class UniqueLastNameValidator extends ItemStreamSupport implements Validator<Customer> {
private Set<String> lastNames = new HashSet<>();
@Override
public void validate(Customer value) throws ValidationException {
if(lastNames.contains(value.getLastName())) {
throw new ValidationException("Duplicate last name was found: " + value.getLastName());
}
this.lastNames.add(value.getLastName());
}
@Override
public void update(ExecutionContext executionContext) {
executionContext.put(getExecutionContextKey("lastNames"), this.lastNames);
}
@Override
public void open(ExecutionContext executionContext) {
String lastNames = getExecutionContextKey("lastNames");
if(executionContext.containsKey(lastNames)) {
this.lastNames = (Set<String>) executionContext.get(lastNames);
}
}
}
/**
* ValidationJob.java
*/
public class ValidationJob {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
@StepScope
public FlatFileItemReader<Customer> customerItemReader(
@Value("#{jobParameters['customerFile']}")Resource inputFile) {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerItemReader")
.delimited()
.names(new String[] {"firstName",
"middleInitial",
"lastName",
"address",
"city",
"state",
"zip"})
.targetType(Customer.class)
.resource(inputFile)
.build();
}
@Bean
public ItemWriter<Customer> itemWriter() {
return (items) -> items.forEach(System.out::println);
}
@Bean
public UniqueLastNameValidator validator() {
UniqueLastNameValidator uniqueLastNameValidator = new UniqueLastNameValidator();
uniqueLastNameValidator.setName("validator");
return uniqueLastNameValidator;
}
@Bean
public ValidatingItemProcessor<Customer> customerValidatingItemProcessor() {
return new ValidatingItemProcessor<>(validator());
}
@Bean
public BeanValidatingItemProcessor<Customer> customerValidatingItemProcessor() {
return new BeanValidatingItemProcessor<>();
}
@Bean
public Step copyFileStep() {
return this.stepBuilderFactory.get("copyFileStep")
.<Customer, Customer>chunk(5)
.reader(customerItemReader(null))
.processor(customerValidatingItemProcessor())
.writer(itemWriter())
.stream(validator())
.build();
}
@Bean
public Job job() throws Exception {
return this.jobBuilderFactory.get("job")
.start(copyFileStep())
.build();
}
public static void main(String[] args) {
SpringApplication.run(ValidationJob.class, "customerFile=/input/customer.csv");
}
}
ItemProcessorAdaptor 예시
- 만약 UpperCaseNameService.upperCase(Customer customer) 라는 로직이 이미 있을 경우
@Bean
public Step step() {
return this.stepBuilderFactory.get("step")
.<Customer, Customer>chunk(5)
.reader(...)
.processor(itemProcessor(null))
.writer(...)
.build();
}
@Bean
public ItemProcessorAdapter<Customer, Customer> itemProcessor(UpperCaseNameService service) {
ItemProcessorAdapter<Customer, Customer> adapter = new ItemProcessorAdapter<>();
adapter.getTargetObject(service);
adapter.getTargetMethod("upperCase");
return adapter;
}
ScriptItemProcessor 예시
- 이름을 대문자로 만드는 Javascript 로직 수행
@Bean
@StepScope
public ScriptItemProcessor<Customer, Customer> itemProcessor(
@Value("#{jobParameters['script']}") Resource script
) {
ScriptItemProcessor<Customer, Customer> itemProcessor = new ScriptItemProcessor<>();
itemProcessor.setScript(script);
return itemProcessor;
}
CompositeItemProcessor 예시
- 여러개의 ItemProcessor를 Chain 형태로 수행 해야 할 때 사용
- 사용 조건:
- 이전 프로세서가 반환한 타입과 다음 프로세서의 입력 타입이 같아야 한다
- 한 프로세서가 null을 반환하면 해당 아이템은 더이상 처리되지 않는다
@Bean
public CompositeItemProcessor<Customer, Customer> itemProcessor() {
CompositeItemProcessor<Customer, Customer> itemProcessor = new CompositeItemProcessor();
itemProcessor.setDelegates(Arrays.asList(
itemProcessor1(), itemProcessor2(), itemProcessor3()
));
return itemProcessor;
}
ClassifierCompositeItemProcessor 예시
- 각각의 ItemReader를 통해 넘어온 item 데이터에 따라 다른 ItemProcessor를 활용 하고 싶을 때 사용
- ex) 우편번호가 홀수인지, 짝수인지 따라 전달되는 itemProcessor 에 따라 처리
@AllArgsConstructor
public class ZipCodeClassifier implements Classifier<Customer, ItemProcessor<Customer, Customer>> {
private ItemProcessor<Customer, Customer> oddItemProcessor;
private ItemProcessor<Customer, Customer> evenItemProcessor;
@Override
public ItemProcessor<Customer, Customer> classify(Customer classifiable) {
if (Integer.parseInt(classifiable.getZipCode()) % 2 == 0) {
return evenItemProcessor;
} else {
return oddItemProcessor;
}
}
}
@Bean
public ClassifierCompositeItemProcessor<Customer, Customer> itemProcessor() {
ClassifierCompositeItemProcessor<Customer, Customer> itemProcessor
= new ClassifierCompositeItemProcessor<>();
itemProcessor.setClassifier(classifier());
return itemProcessor;
}
@Bean
public Classifier classifier() {
return new ZipCodeClassifier(upperCaseItemProcessor(null), lowerCaseItemProcessor(null));
}
CustomItemProcessor 예시
- ItemProcessor Interface 구현체를 직접 생성
- return 값이 null 일 경우 해당 item을 Filter 처리(SKIP)
public class EvenFilteringItemProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer item) throws Exception {
return Integer.parseInt(item.getZipCode()) % 2 == 0 ? null : item;
}
}
- ItemProcessor 의 처리 결과를 조회하여 확인이 가능. (FILTER_COUNT도 확인 가능)
SELECT STEP_EXECUTION_ID as ID, STEP_NAME, STATUS, COMMIT_COUNT, READ_COUNT, FILTER_COUNT, WRITE_COUNT
from BATCH_STEP_EXECUTION;
Reference