티스토리 뷰
Intro
그 동안 다양한 비즈니스 로직을 가진 배치 잡들을 생성해본 결과 결국 모든 배치 시스템의 핵심은 성능과 안정성으로 귀결된다고 생각한다.
-> 시스템 부하없이 최대한 빠르게 처리하면서도, 안정적으로 데이터 정합성을 유지하는 것.
하지만 성능과 안정성은 보통의 경우 서로 상충되기 때문에, 어느 정도 저울질을 하면서 더 중요한 것을 챙기고 일정 부분 포기해야 한다.
그렇기 때문에 개발 전에 요구사항에 대해 꼼꼼하게 검토한 후, 각 기능에 대한 우선순위를 정하는 것이 가장 중요하다는 것을 배웠다.
지금까지 배치 시스템을 개발/운영하면서 알게 된 내용을 한 글로 정리해보고자 한다.
배경설명
최근에 개발한 배치에 그 동안의 공부했던 여러 방법론들을 적용해서 이 job을 예시로 정리해본다.
이 배치의 기능은 외부 시스템과 SKU라는 물류 데이터를 싱크하는 것인데, 요구사항은 다음과 같다.
1. 외부 데이터 조회 API는 두 번에 걸쳐서 조회한다.
- 요약 API로 id 목록 조회
- 조회한 id로 상세 API 호출 -> 상세 데이터 확보
2. 인터페이스 테이블에 응답 데이터를 저장한다.
3. 조회한 내용에 변경사항이 있다면, 내부 DB를 업데이트하는 API를 호출하여 데이터 변경을 반영한다.
처음 개발한 시퀀스는 다음과 같았다.
가장 일반적인 Spring Batch 구조로, Reader -> Processor -> Writer로 한 Item 단위 씩 처리했다.
처음에는 문제가 없었지만, 테스트 중 데이터 양을 늘리자 느려지기 시작했다. 2만 건이 넘어가기 시작하면서 30분 이상 소요됐고, 더 쌓이기 시작하자 1시간 이상은 기본이었다.
심지어 내부 API 호출 부분에서 일부 데이터가 누락되는 등 데이터 일관성도 깨지는 케이스도 발생했다.
한 달 정도의 기간 동안 여러 개선 작업을 진행하면서 성능 / 안정성을 향상 시키기 위한 방법을 정리하자면 다음과 같다.
성능을 개선하기 위해 적용한 방법은 총 3가지:
1. Network I/O 줄이기
2. DB I/O 줄이기
3. 멀티스레드 사용하기
안정성을 위해 적용한 방법은 총 3가지:
1. Outbox 패턴 적용
2. 멱등 API 적용
3. 비동기 HTTP + Polling
하나씩 살펴보자.
성능 개선
1. Network I/O 줄이기
배치 외부로 HTTP Request를 보내는 포인트는 총 3군데로 모두 병목이 생길 수 있는 지점이다.
- 요약 API 호출
- 상세 API 호출
- 내부 DB 업데이트 API 호출
보통 Reader, Processor, Writer로 Step 을 생성할 때 아래와 같은 코드로 사용한다.
@Bean(STEP_NAME)
@JobScope
public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder(JOB_NAME+"step", jobRepository)
.<SkuDto, SkuDto>chunk(CHUNK_SIZE, transactionManager)
.reader(SkuItemReader())
.processor(SkuItemProcessor())
.writer(SkuItemWriter())
.build();
}
ItemReader -> ItemProcessor -> ItemWriter 동작 방식을 확인해보면
- Reader는 read() 메서드의 응답으로 sku를 1건씩 반환
- Processor에서는 상세 API를 호출하여 한 sku 상세 정보를 받아옴.
- 이 작업(1~2)을 Chunk Size 만큼 반복한 후, Writer에서 chunk번 업데이트 API 호출
즉,
- ItemReader와 ItemProcessor는 1:1 동작
- ItemWriter는 chunkSize만큼 모은 후 일괄 실행
이렇기 때문에 현 상태에서는 Reader와 Processor에서는 chunk size 만큼 API 호출을 하고 있던 것이다.
이를 고치기 위해 chunk 단위를 일반 Item에서 List<Item> 으로 변경했다.
@Bean(STEP_NAME)
@JobScope
public Step step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder(JOB_NAME+"step", jobRepository)
.<List<SkuDto>, List<SkuDto>>chunk(1, transactionManager)
.reader(SkuItemReader())
.processor(SkuItemProcessor())
.writer(SkuItemWriter())
.build();
}
API call 방식도 body에 한꺼번에 여러 Id를 담는 방식으로 변경하여 모두 일괄 조회할 수 있게 되었다.
2. DB I/O 줄이기
총 3가지 방법으로 DB I/O를 줄였다.
(1) ItemReader Pagination 처리
일반적으로 Cursor 기반 ItemReader들은 성능이 좋지만 데이터 크기가 커지면 메모리 이슈가 발생한다.
이 배치도 마찬가지로 조회 대상 테이블의 데이터가 매우 크기 때문에 PagingItemReader를 사용했는데, 기존 배치 잡을 참고하여 JpaPagingItemReader를 적용했었다.
하지만 여기서도 엄청난 병목이 발생하는데, 이건 Page가 누적될 수록 오래 걸리는 이슈가 발생하기 때문이다.
이런 ItemReader를 MySQL에서 사용하면 limit, offset 구문을 사용해 데이터를 조회하는데,
-- 조회 결과: 100건, 조회 속도: 매우 느림 (환경에 따라 다르지만 최소 수 십 초는 걸림)
select * from student where gender = 'MALE' limit 50000000, 100
OFFSET 값이 커질수록 쿼리 속도가 느려지는 이유는 MySQL이나 대부분의 RDBMS에서 OFFSET 이후의 행만 가져오는 것이 아니라, OFFSET 이전의 모든 행을 먼저 스캔해야 하기 때문이다.
이렇기 때문에 OFFSET을 항상 0으로 유지하는 커스텀 PagingItemReader가 필요하다.
- PK(id)값 오름차순으로 정렬
- 3번 Page를 조회한다면 2번 Page의 마지막 id값인 5235를 사용해 ‘where id > 5235’를 쿼리에 자동으로 추가
- offset을 0으로 유지
이렇게 10만건 이상의 row를 scan 해야하는 쿼리들은 모두 ZeroOffsetItemReader로 조회하도록 변경했다.
(2) Batch INSERT
MyBatis를 활용한 Batch INSERT
MyBatis에서는 ExecutorType.BATCH를 설정하여 여러 insert 쿼리를 한 번에 처리할 수 있다.
@Bean
public ItemWriter<CustomerDto> customerBatchWriter() {
return items -> {
SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
try {
CustomerMapper mapper = session.getMapper(CustomerMapper.class);
for (CustomerDto item : items) {
mapper.insertCustomer(item);
}
session.commit();
} catch (Exception e) {
session.rollback();
throw e;
} finally {
session.close();
}
};
}
JDBC 배치 처리이므로 한 번의 네트워크 round-trip으로 수백 건의 데이터를 처리할 수 있어 매우 빠르다.
JPA를 활용한 Batch INSERT
JPA는 기본적으로 1건 단위로 SQL을 실행하므로 hibernate.jdbc.batch_size를 설정하고 flush/clear를 수동으로 처리한다.
@Bean
public ItemWriter<Customer> customerJpaBatchWriter(EntityManagerFactory emf) {
return new JpaItemWriterBuilder<Customer>()
.entityManagerFactory(emf)
.usePersist(true)
.build();
}
영속성 컨텍스트의 사이즈가 커지면 메모리 문제가 발생하므로 주기적인 flush()와 clear() 호출이 필요하다.
배치에서는 개인적으로 JPA는 별로 좋은 수단은 아닌거 같다는 생각이 든다.
우선 배치라는 시스템 특성상 거의 웬만한 상황에서는 필요한 칼럼만 Update하는 상황이다 -> 즉 영속성 컨텍스트가 필요한 경우가 거의 없다. 또한 위 batch insert 를 할 때 flush / clear 등 수동으로 처리해야하는 등 성가신 부분들이 많다.
(3) In Update
앞서 언급한 Batch INSERT와 비슷하게 Update하는 로직도 모두 WHERE ~ IN 절로 일괄로 업데이트 하게 되어 DB I/O 횟수를 줄였다.
3. 멀티스레드 사용하기
TaskExecutor 기반 멀티 스레드 설정
멀티 스레드 처리는 Step 수준에서 설정하며, taskExecutor 속성을 활용하여 병렬 처리를 적용한다.
@Bean
public Step multiThreadedStep() {
return stepBuilderFactory.get("multiThreadedStep")
.<Customer, Customer>chunk(100)
.reader(customerItemReader())
.processor(customerItemProcessor())
.writer(customerItemWriter())
.taskExecutor(threadPoolTaskExecutor())
.throttleLimit(4) // 동시에 최대 4개의 스레드로 처리
.build();
}
@Bean
public TaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("batch-thread-");
executor.initialize();
return executor;
}
단일 Step 내에서 멀티 스레드를 적용하여 각 청크 단위를 병렬로 처리한다.
멀티 스레드 적용 시 다음과 같은 주의사항이 존재한다.
- Reader는 스레드 세이프 해야 함
JdbcPagingItemReader와 같이 스레드 세이프하지 않은 Reader는 병렬 처리에 적합하지 않으므로 SynchronizedItemStreamReader로 감싸거나 Partitioning 방식으로 전환을 고려해야 한다. - DB 락 및 동시성 이슈
DB에서 동일한 데이터를 동시에 수정하거나 조회할 경우 락이 발생할 수 있다. 가능한 한 Reader/Writer의 동시성 처리를 명확히 분리해야 한다. - JobRepository는 병렬 처리 시 thread-safe 보장 필요
별도 트랜잭션 관리 및 격리 수준 설정이 중요하다.
안정성 개선
이 배치는 총 2가지 작업으로 나뉘어져 있다.
1. IF 테이블에 외부 API를 통해 받은 데이터를 적재.
2. 내부 DB에 변경사항 업데이트하기.
public void write(@NonNull Chunk<? extends List<SkuDetail>> chunk) throws Exception {
// 코드 생략
List<SkuSyncResponse> skuSyncResponses = new ArrayList<>();
for (Map.Entry<String, List<SkuSyncRequest>> entry : requestsByAccountId.entrySet()) {
List<SkuSyncRequest> requests = entry.getValue();
try {
skuSyncResponses.addAll(fulfillmentService.getSkuSyncResults(requests));
} catch (Exception e) {
log.error("sku sync 실패: {}", e);
}
}
}
기존 방식에서는 시퀀스에서 보다시피 Writer에서 2번을 API 호출을 통해 하고 있었다.
여기서 문제가 생긴다. 1번과 2번이 Transactional하게 수행되어야 하는데, 너무 많은 건들이 한꺼번에 요청 되거나, 내부 서버에 이슈가 생겨서 API에 정상적인 응답이 불가능할 경우, 데이터 정합성 문제가 발생할 수 있는 것이다.
1. Outbox 패턴 적용
API 호출이 실패하면 전체 트랜잭션이 롤백되어 데이터를 손실할 위험이 있기 때문에 위해 Outbox Pattern을 적용했다.
- 외부 API 호출 대신 outbox 테이블에 status = 'PENDING'으로 기록
- 별도 JOB에서 PENDING 상태인 데이터를 API로 전송
- 성공 시 status = 'SYNCED', 실패 시 status = 'FAILED'로 상태 업데이트
이를 통해 DB저장과 메세지 전송 간의 이슈를 분리하고, 실패 시에도 재시도나 복구가 가능하도록 했다.
OUTBOX 스키마
CREATE TABLE sku_outbox (
id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
sku_id VARCHAR(20) NOT NULL, -- SKU 식별자
event_type VARCHAR(50) NOT NULL, -- 이벤트 종류, 예: 'SKU_SYNC'
payload JSON NOT NULL, -- 실제 전송할 데이터
correlation_id VARCHAR(64), -- API로 받은 correlationId
status VARCHAR(20) NOT NULL, -- 처리 상태: PENDING, SUBMITTED, SYNCED, FAILED
retry_count INT NOT NULL DEFAULT 0, -- 재시도 횟수
error_message TEXT, -- 실패 시 에러 메시지
created_at DATETIME NOT NULL DEFAULT NOW(), -- 레코드 생성 시각
submitted_at DATETIME, -- API에 전송한 시각 (202 Accepted 응답 받은 시각)
completed_at DATETIME, -- 최종 완료 시각
INDEX idx_status_created (status, created_at),
INDEX idx_correlation (correlation_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
2. 멱등 API 적용
SKU 업데이트 API를 멱등하게 구성하여, 동일한 id 에 대한 요청이 여러 번 전송되더라도 데이터 중복 없이 한 번만 처리되도록 구현했다.
이를 통해 네트워크 재시도, 타임아웃 재전송 등의 상황에서도 시스템의 무결성과 일관성을 유지할 수 있게 되었다.
Chunk Size를 1로 고정하고, List형태로 배치를 처리하다 보니 스프링 배치가 기본으로 제공하는 Retry나 Skip 정책을 사용하기 매우 어렵게 되었다.
API 내 1000개의 업데이트 요청 데이터가 담겼는데 서버에서 950개 성공했다고 한다면, 나머지 실패한 50개에 대한 재처리를 다시 pinpoint 하는게 매우 어려워진 것이다.
하지만 재처리를 pinpoint하는 방식이 아닌 무식하게 일괄로 다시 요청해도 데이터 일관성을 유지한다면 어떨까?
중복요청이 들어오긴 하겠지만 실제로 재시도하는 것은 실패한 50개일테니 일관성 측면에서는 큰 문제가 없다.
3. 비동기 HTTP + Polling
기존에는 Fulfillment API 호출 후 응답을 기다리는 구조였으나, Fulfillment 서비스 처리 시간이 길어지면서 클라이언트(batch job)가 timeout되거나 중복 전송되는 문제가 있었다.
이를 해결하기 위해 비동기 HTTP 처리 + Polling 방식을 적용했다.
- Batch Job에서는 API에 요청하고 202 Accepted + correlationId를 받음
- 상태는 SUBMITTED으로 저장하고 즉시 다음 작업으로 진행
- 별도 Poller가 주기적으로 /status/{correlationId}를 호출하여 완료 여부 확인
- 결과에 따라 SYNCED 또는 FAILED로 상태 변경
이 방식은 클라이언트와 서버 간 timeout 이슈를 근본적으로 방지하고, 비동기 아키텍처를 통해 시스템의 안정성과 응답성을 향상시킬 수 있다.
변경된 시퀀스는 다음과 같다.
결론
자세한 내용은 모두 밝히기 어렵지만, 이 배치로 데이터 약 5만 건 기준 1시간 이상 걸리던 작업이 현재 평균 3분대 안으로 개선되었다.
물론 배치 자체가 두 개로 나뉘어졌고, 비동기로 바로 202 응답을 받는 방식으로 변경되었기 때문에 전후를 이렇게 시간대로 단순히 비교하는 것이 공정한(?) 방식이진 않을 수 있겠다.
하지만 확실한 것은 I/O를 줄이고 각 시퀀스마다의 커플링을 줄였기 때문에 훨씬 더 모니터링하기 편하고 관리 포인트가 제대로 잡혔기 때문에 배치로 인한 운영 업무 소요시간이 줄었기 때문에 개인적으로 매우 유의미한 결과라고 생각한다.
'실무 개발 > 삽질 기록' 카테고리의 다른 글
MSA 환경에서 로그 조회 시간 단축하기 (0) | 2024.07.22 |
---|---|
JVM OOM 서비스 중단 회고 (0) | 2024.07.12 |
okhttp3 IOException: unexpected end of stream on Connection (0) | 2023.10.31 |
JWT 토큰으로 Stateless한 인증하기 (0) | 2023.06.05 |
java.util.regex.PatternSyntaxException: Dangling meta character (replace(), replaceAll()의 차이) (0) | 2022.01.21 |
- Total
- Today
- Yesterday
- JobInstance
- PatternSyntaxException
- springboot
- 2020 KAKAO
- nginx 내부
- behavior parameterization
- 2019 Kakao Blind
- KAKAO 2021
- decorator
- Java
- runidincrementer
- Kakao Blind
- 2021
- zipkin
- Java #JIT #JVM
- WORE
- 카카오코테
- okhttp3
- 프로그래밍 모델
- 카카오 코테
- IOC
- Java #GC #가비지콜렉터 #Garbage Collector
- spring cloud sleuth
- jvm
- 디자인패턴
- 스프링
- 모던 자바 인 액션
- 카카오
- 코테
- Spring
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 |