최근 프로젝트에서 Spring Batch를 사용하여 데이터를 처리하는 과정에서 새로운 요구사항이 발생했어요.
기존에는 Chunk 방식으로 Reader → Processor → Writer를 통해 테이블 정보를 수집하고 처리했는데, 내부 프로세스의 새로운 요구사항에 맞추기 위해 기존 기능은 유지하면서도 Chunk 방식을 Tasklet으로 전환하여 upsert를 처리해야 했어요.
Upsert 란?
데이터베이스에 존재하지 않는 경우 삽입하고, 존재하는 경우 업데이트하는 방식이에요.
기존 Chunk 방식
@JobScope
@Bean
public Step tabCurrStep() throws Exception {
return new StepBuilder("tabCurrStep", jobRepository)
.<CatalogTabHist, CatalogTabCurr>chunk(CHUNK_SIZE, transactionManager)
.reader(tabCurrReader(null, null))
.processor(catalogTabCurrProcessor())
.writer(catalogJpaWriter())
.taskExecutor(taskExecutor())
.build();
}
기존의 구성된 Chunk 방식은 다음과 같은 구조로 이루어져 있어요
- Reader: 데이터를 읽어오는 역할
- Processor: 데이터를 가공하는 역할
- Writer: 데이터를 저장하는 역할
이 방식은 데이터 처리량이 많고, 처리 과정에서 데이터 변환이 필요한 경우에 매우 유용해요.
Postgres Upsert
Postgres 15버전 이상은 merge 문을 이용해서 구현하면 되는데 현재 프로젝트에 사용중인 버전은 15버전보다 낮아서 Insert ~ on conflict 문을 사용했어요.
아래는 PostgreSQL 의 Merge와 Insert ~ on conflict에 대해 간략하게 정리했어요.
PostgreSQL의 MERGE 문
PostgreSQL 15부터 MERGE 문이 도입되었는데, MERGE 문은 조건에 따라 데이터베이스의 테이블을 업데이트, 삽입 또는 삭제할 수 있는 문법이에요. 주로 "upsert" 작업(존재하지 않으면 삽입하고, 존재하면 업데이트)에서 사용돼요. MERGE 문은 다양한 조건에 따라 복잡한 로직을 구현할 수 있어요.
MERGE 문 기본 구문 검색 결과를 기반으로 다른 테이블에 INSERT
MERGE INTO target_table USING source_table
ON target_table.key = source_table.key
WHEN MATCHED THEN
UPDATE SET target_table.column1 = source_table.column1
WHEN NOT MATCHED THEN
INSERT (column1, column2) VALUES (source_table.column1, source_table.column2);
두 개의 테이블이 있다고 가정했는데, source_table에서 특정 조건을 만족하는 데이터를 검색하고, 그 결과를 target_table에 삽입해요.
PostgreSQL의 INSERT ~ ON CONFLICT 문
PostgreSQL의 INSERT ... ON CONFLICT 문은 데이터베이스에 데이터를 삽입할 때, 특정 열에서 충돌이 발생하면 이를 처리할 수 있는 강력한 기능을 제공해요. 이 문법을 사용하면 데이터를 삽입하거나 업데이트할 수 있어요.
코드
다음은 source_table에서 데이터를 선택해 target_table에 삽입할 때, id 열에서 충돌이 발생하면 해당 행을 업데이트하는 코드예요.
-- source_table에서 데이터를 선택하여 target_table에 삽입 또는 업데이트
INSERT INTO target_table (id, name, value)
SELECT id, name, value
FROM source_table
ON CONFLICT (id)
DO UPDATE SET
name = EXCLUDED.name,
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP;
- INSERT INTO target_table (column1, column2, ...) VALUES (value1, value2, ...): 데이터를 삽입해요.
- ON CONFLICT (conflict_column): conflict_column 열에서 충돌이 발생할 때 처리할 방법을 지정해요.
- DO UPDATE SET column1 = EXCLUDED.column1, ...: 충돌이 발생하면, EXCLUDED 테이블의 값을 사용해 지정된 열을 업데이트해요.
이 문법을 사용하면 중복된 키가 발생할 때, 데이터베이스에 데이터를 삽입하거나 업데이트하는 작업을 효율적으로 처리할 수 있어요. 이를 통해 데이터 무결성을 유지하면서 성능을 최적화할 수 있어요.
저는 Upsert 코드를 Spring Data JPA 가 제공하는 Native 쿼리로 작성했어요.
Upsert 코드 작성
@Modifying(clearAutomatically = true)
@Query(value = """
INSERT INTO catalog_tab_curr (
catalog_tab_curr_id,
// .. 쿼리생략
)
SELECT
nextval('qbig.CATALOG_TAB_CURR_SEQ'),
:collectorDate,
:collectorDate,
// .. 쿼리생략
FROM catalog_tab_hist h
WHERE
h.collector_dt = :collectorDate
AND h.instance_id = :instanceId
ON CONFLICT (catalog_id)
DO UPDATE SET
update_dt = :collectorDate ,
instance_id = EXCLUDED.instance_id ,
// .. 쿼리생략
""", nativeQuery = true)
void currentTableMerge(@Param("collectorDate") LocalDateTime collectorDate, @Param("instanceId") String instanceId);
- Native 쿼리에서도 JPA가 제공하는 Named Parameters를 사용할 수 있어요. ****
- Named Parameters는 쿼리 문자열 내에서 : 뒤에 파라미터 이름을 명시해서 사용하는 파라미터예요. 이렇게 하면 쿼리를 더 읽기 쉽게 만들고, 파라미터의 순서와 관계없이 명시적으로 매핑할 수 있어요.
- @Modifying(clearAutomatically = true): 이 애노테이션은 데이터베이스에서 데이터를 수정하는 쿼리를 실행할 때 사용해요. clearAutomatically = true 속성은 이 메서드가 호출된 후에 영속성 컨텍스트를 자동으로 클리어해줘요.
- @Query: 이 애노테이션은 쿼리를 정의할 때 사용해요. **nativeQuery = true**로 설정해서 네이티브 SQL 쿼리를 사용할 수 있어요.
Tasklet으로 전환
Tasklet은 Spring Batch에서 단일 작업을 정의하는 구성 요소로, 보다 유연하게 커스텀 로직을 구현할 수 있어요. Tasklet을 사용하여 upsert를 처리하기 위해 다음과 같은 단계를 따랐어요.
- Tasklet 구현: 기존 Chunk 단계에서 수행하던 읽기, 처리, 쓰기 로직을 Tasklet 내에 구현했어요.
- 업서트 로직: 데이터베이스에 데이터를 upsert하는 로직을 구현했어요.
Tasklet 구현
아래는 Tasklet을 사용하여 upsert를 처리하는 코드예요.
@JobScope
@Bean
public Step tabCurrStepV2(@Value("#{jobParameters[instanceId]}") String instanceId, @Value("#{jobParameters[collectorDate]}") LocalDateTime collectorDate, @Value("#{jobExecutionContext[currentCollectorId]}") Long currentCollectorId) throws Exception {
return new StepBuilder("tabCurrStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
//1 . Processor 에서 처리하던 과정은 여기서 처리
//... 코드 생략
// 2. UPSERT 처리
catalogTabCurrRepository.currentTableMerge(collectorDate,instanceId);
return RepeatStatus.FINISHED; // TASKLET 정상 종료
}, transactionManager)
.build();
}
- Job Parameters와 Execution Context: jobParameters는 Spring Batch에서 Job 실행 시 전달된 파라미터를 의미하고, jobExecutionContext는 Job 실행 중에 공유되는 컨텍스트 값을 의미해요.
- Spring Expression Language (SpEL): @Value 애노테이션 내부에서 #{} 구문을 사용해서 SpEL 표현식을 처리했어요. 이렇게 하면 Job 파라미터와 ExecutionContext 값을 쉽게 주입받을 수 있어요.
저는 Tasklet을 따로 분리해서 관리하는것을 선호하기 때문에 람다식으로 작성한 Tasklet을 분리하는 작업을 진행했어요.
람다식으로 작성한 Tasklet을 분리하는 단계는 총 3단계로 진행해요.
Tasklet 분리
1. Tasklet의 구현체 클래스 생성
@RequiredArgsConstructor
public class CatalogCurrTasklet implements Tasklet {
// 생성자 주입 필드 생략
@Value("#{jobParameters[instanceId]}")
private String instanceId;
@Value("#{jobParameters[collectorDate]}")
private LocalDateTime collectorDate;
@Value("#{jobExecutionContext[currentCollectorId]}")
private Long currentCollectorId;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
//1 . Processor 에서 처리하던 과정은 여기서 처리
//... 생략
// 2. UPSERT 처리
catalogTabCurrRepository.currentTableMerge(collectorDate,instanceId);
return RepeatStatus.FINISHED; // TASKLET 정상 종료
}
}
2. 작성한 구현체를 스프링 빈으로 등록
@Bean
@StepScope
public Tasklet catalogCurrTasklet() {
return new CatalogCurrTasklet(
// 생성자 주입
);
}
3. 기존에 작성한 Step 코드 수정
@JobScope
@Bean
public Step tabCurrStepV2() throws Exception {
return new StepBuilder("tabCurrStepV2", jobRepository)
.tasklet(catalogCurrTasklet(),transactionManager)
.build();
}
이렇게 만든 Tasklet Step을 기존에 사용하던 Chunk Step 자리에 바꿔줬어요.
항상 코드를 작성했으면 잘 작동하는지 테스트를 해봐야 하는데요. 스프링 배치에서 제공해주는 @SpringBatchTest 어노테이션을 테스트 클래스에 작성하면 쉽고 빠르게 테스트를 진행할 수 있어요.
@SpringBatchTest
@SpringBootTest
class BatchJobTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@DisplayName("Batch Job 테스트")
@Test
public void BATCH_JOB_테스트() throws Exception {
LocalDateTime collectorDate = LocalDateTime.now();
JobParameters jobParameters = new JobParametersBuilder()
.addString("instanceId", "00001")
.addLocalDateTime("collectorDate", collectorDate)
.toJobParameters();
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
//then
Assertions.assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
}
테스트 코드 구성은 Job을 실행할 때 필요한 파라미터값을 전달해 주는 JobParameters와 스프링 배치에서 제공 해주는 JobLauncherTestUtils 를 사용해서 간단하게 구현 했어요.
JobExecution 객체는 종료된 Job의 다양한 정보를 가지고 있어요. 위와 같은 코드를 작성한 후 배치가 정상종료 되지 않았다면 아래와 같은 에러가 발생해요.
마무리하며
이번에는 Spring Batch를 사용해서 데이터를 처리하는 과정에서 발생한 새로운 요구사항을 해결하기 위해, 기존 Chunk 방식을 Tasklet으로 전환하는 방법을 알아봤어요.
또한, JPA에서 Named Parameters를 사용하는 방법과 Spring Expression Language(SpEL)를 통해 Job 파라미터를 주입받는 방법도 함께 살펴봤어요.
프로젝트의 요구사항이 변화함에 따라 Batch 처리 로직도 유연하게 조정할 수 있어야 하므로, Tasklet과 Chunk 방식을 적절히 활용하여 최적의 솔루션을 찾는 것이 중요해요.
도움이 되셨기를 바라며, 감사합니다!