[Spring Batch] JpaPagingItemReader + JpaItemWriter를 사용했 때겪었던 문제점 (1)
최근 배치를 짜고 운영하다가 참으로 이상한 현상을 겪었는데, 이를 JpaPagingItemReader, JpaItemWriter의 내부 동작과 함께 원인을 정리해보려 한다. (aka. 나의 삽질 기록)
내 삽질 기록을 정리한 글을 통해 말하고자 하는 내용을 미리 정의하고 넘어가자면,
배치 reader로 조회한 엔티티를 그대로 사용하지말고 특정 필요한 값만 dto로 뽑아쓰는 방식을 취하자이다 (엔티티의 수정 x).
이 글은 내가 배치 reader로 조회한 엔티티 그대로를 직접 수정, dirty checking을 이용하다 생긴 문제로,
애초에 이렇게 구현하면 안된다가 전제로 깔린다.
그냥 배치에서 엔티티를 지직 볶고 하다가 깨닫게 된 몇가지 내용들을 정리한 글로 봐주길.
문제상황 예시
먼저 Coupon(쿠폰)이라는 아래와 같은 엔티티가 있다고 합시다.
@Entity
class Coupon(
id: Long? = null,
serialNumber: String,
status: CouponStatus
) {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id", nullable = false)
val id: Long? = id
@Column(name = "serial_number", nullable = false)
val serialNumber: String = serialNumber
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
var status: CouponStatus = status // 상태값: CREATED, REGISTERED, DISCARD
protected set
@OneToMany(mappedBy = "coupon", fetch = FetchType.LAZY, cascade = [CascadeType.PERSIST, CascadeType.MERGE])
val histories: MutableList<CouponHistory> = mutableListOf()
// 폐기
fun discard(): Coupon = apply {
this.status = CouponStatus.DISCARD
histories.add(CouponHistory.of(this)) // 자식 엔티티에 하나를 추가한다
}
}
이 쿠폰에 대해서 현재 테이블에 있는 모든 쿠폰들폐기 처리를 해야한다는 요구사항이 생겼다.
즉, coupon 테이블에 있는 모든 데이터들을 가져와서 status를 DISCARD 상태로 만들어야 한다.
더불어, 이 때 쿠폰의 status가 변경되었음을 기록하기 위해 자식 엔티티인 coupon_history 테이블에 데이터를 추가해야한다.
이 비니스가 구현되어 있는 것이 바로 위의 discard() 메서드이다.
(부모 엔티티가 Coupon, 자식 엔티티로 CouponHistory 연관관계가 맺어져 있고 Cascade 옵션이 걸려있다.)
CouponHistory 엔티티 코드 자체는 중요하지 않기에... 더보기로만 남긴다.
CouponHistory
@Entity
class CouponHistory(
id: Long? = null,
coupon: Coupon,
status: CouponStatus
) {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id", nullable = false)
val id: Long? = id
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(
name = "coupon_id"
)
val coupon: Coupon = coupon
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
var status: CouponStatus = status
protected set
companion object {
fun of(
coupon: Coupon
): CouponHistory =
CouponHistory(
coupon = coupon,
status = coupon.status
)
}
}
자 그럼 모든 쿠폰을 폐기 처리하는 아주 심플한 배치를 만들어보자.
@Configuration
class CouponDiscardBatchConfiguration(
val jobBuilderFactory: JobBuilderFactory,
val stepBuilderFactory: StepBuilderFactory,
val entityManagerFactory: EntityManagerFactory,
@Value("\${chunkSize:$DEFAULT_CHUNK_SIZE}")
val chunkSize: Int
) {
@Bean
fun job(): Job =
jobBuilderFactory.get(JOB_NAME)
.start(discardVouchersStep())
.build()
@Bean
fun discardVouchersStep(): Step =
stepBuilderFactory.get(DISCARD_VOUCHERS_STEP_NAME)
.chunk<Coupon, Coupon>(chunkSize)
.reader(reader())
.processor(processor())
.writer(writer())
.build()
@StepScope
@Bean
fun reader(): JpaPagingItemReader<Coupon> =
JpaPagingItemReaderBuilder<Coupon>()
.name("reader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("SELECT c FROM Coupon c order by id desc")
.build()
@StepScope
@Bean
fun processor(): ItemProcessor<Coupon, Coupon> =
ItemProcessor {
it.discard()
}
@Bean
@StepScope
fun writer(): JpaItemWriter<Coupon> {
val writer: JpaItemWriter<Coupon> = JpaItemWriter<Coupon>()
writer.setEntityManagerFactory(entityManagerFactory)
return writer
}
companion object {
const val DEFAULT_CHUNK_SIZE = 10
const val JOB_NAME = "couponDiscardJob"
}
}
정말 간단허접한 구조.
현재 coupon 테이블에는 55개의 데이터가 저장되어 있고, 청크 사이즈는 10으로 잡아서 다음과 같은 작업이 일어난다.
(실제 배치는 이렇게 적은 데이터, 작은 청크 사이즈로 이루어지지 않는다!! 문제상황을 설명하기 위한 예시일뿐..)
- reader : JpaPagingItemReader 를 사용해서 coupon을 읽어오고
- processor: coupon을 discard() 메서드로 폐기 처리를 진행하고
- writer : JpaItemWriter 사용하여 영속화 시킨다.
자 그럼 해당 배치를 실행하면, 모든 쿠폰은 폐기 처리되고, 자식 엔티티인 쿠폰의 이력 테이블에도 50개에 대한 쿠폰의 이력이 1개씩 쌓일거라 예상한다. 배치 테스트를 돌려보면...
🔥 문제
왜 때문인지 자식 엔티티인 histories에는 하나만 추가했는데... 중복으로 insert가 되고 있었다.
또 심지어 55개 데이터를 청크 사이즈 10으로 읽어오다가 마지막 청크는 5개 쿠폰에 대한 history는 정상적으로 한개씩만 쌓이고 있다.
왜? JpaPagingItemReader + JpaItemWriter의 동작 순서를 코드로 확인해보기
일단 각각의 주요 코드를 대강 스윽 훑어 봅시다.
JpaPagingItemReader
public class JpaPagingItemReader<T> extends AbstractPagingItemReader<T> {
// ...
@Override
@SuppressWarnings("unchecked")
protected void doReadPage() {
EntityTransaction tx = null;
if (transacted) {
tx = entityManager.getTransaction();
tx.begin();
entityManager.flush();
entityManager.clear();
}//end if
Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
if (parameterValues != null) {
for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
query.setParameter(me.getKey(), me.getValue());
}
}
if (results == null) {
results = new CopyOnWriteArrayList<>();
}
else {
results.clear();
}
if (!transacted) {
List<T> queryResult = query.getResultList();
for (T entity : queryResult) {
entityManager.detach(entity);
results.add(entity);
}//end if
} else {
results.addAll(query.getResultList());
tx.commit();
}//end if
}
// ...
@Override
protected void doClose() throws Exception {
entityManager.close();
super.doClose();
}
}
데이터를 읽어오는 doReadPage() 메서드는 다음과 같이 구현되어 있다.
해당 메서드를 호출하는 부분은 이 클래스가 상속하고 있는 AbstractPagingItemReader의 doRead() 메서드이다.
JpaItemWriter
public class JpaItemWriter<T> implements ItemWriter<T>, InitializingBean {
private EntityManagerFactory entityManagerFactory;
private boolean usePersist = false;
// ...
@Override
public void write(List<? extends T> items) {
EntityManager entityManager = EntityManagerFactoryUtils.getTransactionalEntityManager(entityManagerFactory);
if (entityManager == null) {
throw new DataAccessResourceFailureException("Unable to obtain a transactional EntityManager");
}
doWrite(entityManager, items);
entityManager.flush();
}
protected void doWrite(EntityManager entityManager, List<? extends T> items) {
if (logger.isDebugEnabled()) {
logger.debug("Writing to JPA with " + items.size() + " items.");
}
if (!items.isEmpty()) {
long addedToContextCount = 0;
for (T item : items) {
if (!entityManager.contains(item)) {
if(usePersist) {
entityManager.persist(item);
}
else {
entityManager.merge(item);
}
addedToContextCount++;
}
}
if (logger.isDebugEnabled()) {
logger.debug(addedToContextCount + " entities " + (usePersist ? " persisted." : "merged."));
logger.debug((items.size() - addedToContextCount) + " entities found in persistence context.");
}
}
}
}
write()에서 doWrite() 메서드를 호출하며 reader, processor를 거쳐 넘어온 items를 영속화 시킨다.
(여기서 중요한 내용은 아니지만 usePersist 값은 기본값이 false이기 때문에 이 값을 변경하지 않는 이상 항상 em.merge()가 호출된다.)
그럼 디버깅을 걸고 하나하나씩 좀 더 분석해보자.
사실 문제는 데이터의 양과 관계 없이 청크가 2번 이상 도느냐에 달린 문제이기 때문에,
빠른 디버깅을 위해 전체 데이터 5개, chunkSize는 2로 줄여보았다. (그럼 총 청크가 size 2, 2, 1 이렇게 3번 돌 것이다.)
문제의 핵심 부분에 디버깅을 걸어놓았으니 흐름을 쭉 따라오면 된다.
JpaPagingItemReader
맨 처음으로 JpaPagingItemReader#doOpen 이라는 메서드가 시작된다.
JpaPagingItemReader 는 인스턴스 변수로 entityManager를 가지고 있는데, 이 메서드가 호출되면서
batch configuration에서 설정했던 entityManagerFactory으로 entityManager를 만들고 인스턴스 변수에 할당한다.
참고로 doOpne() 메서드는 배치 실행에서 딱 한번만 실행된다. (reader를 open 하는거니깡)
AbstractPagingItemReader
AbstractPagingItemReader#doRead 가 호출된다.
(현재 pageSize는 파라미터로 준 chunkSize (=2)와 동일하다. )
아직은 reader에서 읽어온 데이터를 보관하는 results가 null이기 때문에 조건식을 만족해서 doReadPage 메서드가 호출된다.
JpaPagingItemReader
reader에서 트랜잭션을 시작하고, 흥미로운 점은 entityManager를 한번 flush 하고 clear 해준다. (?)
그리고 작성한 쿼리를 생성한 것을 확인할 수 있다.
transacted 는 기본값이 true이기 때문에 항상 else 절을 타게 된다.
쿼리의 결과값은 보이는 대로 pageSize 만큼 조회해온(=chunkSize) Coupon 데이터임을 확인할 수 있다.
이를 results 변수에 넣고 트랜잭션을 커밋한다.
ItemProcessor
데이터를 페이지 사이즈만큼 읽어 왔으면, processor에서 상태를 변경한다.
(글 상단의 Coupon#discard 참고. 다시한번 언급하면 여기서 status를 변경하고, 자식 엔티티를 하나 추가한다.)
JpaItemWriter
write에서 doWrtie가 호출되면서, writer로 넘어온 엔티티를 하나하나씩 merge 시킨다.
그림에 써놓았듯이, entityManager.merge(item)을 호출할 때 자식 엔티티(history)에 대한 insert 문이 발생된다.
(이는 CouponHistory 엔티티의 id 전략이 IDENTITY라서 id 값을 바로 채워넣기 위해 한번 insert 쿼리를 날리는 것)
이는 영속성 컨텍스트의 쓰기지연 SQL 저장소에 저장이 된다.
wirte 메서드가 끝나면 coupon 테이블이 업데이트 되고, coupon_history에 데이터가 추가되는 것을 DB에서 확인할 수 있었다.
(entityManager.flush()가 호출이되고 메서드가 끝나면서 스프링 배치가 관리하는 청크 단위의 트랜잭션이 끝나며 쓰기지연 SQL 저장소에 있던 SQL이 디비에 날라가고 커밋이 되며 최종적으로 DB에 반영되는 것)
AbstractPagingItemReader
이제 두번째 청크이다.
doReadPage가 호출된다.
JpaPagingItemReader
음 근데 이상한 점이 있다. doReadPage() 에서 flush를 호출하는 순간, 자식 엔티티에 대한 insert 쿼리가 또 발생하는 것을 볼 수 있다.
그리고 순차적으로 실행이 되다가 끝자락의 tx.commit을 만나면 해당 쿼리가 DB에 반영되고 이 시점에서 같은 자식 엔티티가 또 저장되는 현상을 발견할 수 있었다.
처음 나의 잘못된 추측 (맥락 잘못 짚어서 뻘소리다.. 아까우니 더보기 글로만 냄기기..)
entityManage.merge()의 특성
잠깐 그럼 writer에서 호출했던 merge를 다시 짚고 넘어가보자.
https://stackoverflow.com/questions/1069992/jpa-entitymanager-why-use-persist-over-merge
Merge returns the managed instance that the state was merged with. It does return something that exists in PersistenceContext or creates a new instance of your entity. In any case, it will copy the state from the supplied entity, and return a managed copy. The instance you pass in will not be managed (any changes you make will not be part of the transaction - unless you call merge again). Though you can use the returned instance (managed one).
간단히 말하면, merge는 엔티티 매니저가 관리하는 인스턴스를 반환하고, 매개변수로 넘겨준 인스턴스는 관리 대상이 아니다.
val mergedItem = entityManager.merge(item)
// mergedItem는 영속성 컨텍스트가 관리하는 친구이지만, item은 관리되지 않음(id = null)
doWrtie 메서드를 다시 보면, merge를 호출하고 엔티티 매니저가 관리하는 인스턴스를 받지 않고,
엔티티 매니저에 관리되지 않는 Item 만이 영속성 컨텍스트에 계속 남아있다.
디버깅 콘솔도 보면 histories에 있는 엔티티는 Id 값이 없고 영속화 된 인스턴스가 아니고, 엔티티매니저가 관리하는 인스턴스도 아님을 알 수 있다.
이 item이 엔티티 매니저에 남은 상황에서 다음 청크가 남아있다면, 새로운 청크단위의 트랜잭션이 시작되고 doReadPage에서 다시 flush 호출을 하니 엔티티 매니저는 해당 인스턴스가 아직 영속화 되지 않은 아이라고 생각해서 한 번 더 영속화를 시키고 이를 커밋하기 때문에!!!!! 중복된 데이터가 저장되는 것이다...
➕ 음 근데 그럼 배치에서 청크 단위의 트랜잭션을 관리하는데, 왜 JpaPagingItemReader만 내부에서 따로 트랜잭션을 또 열고 커밋하나..?
https://jojoldu.tistory.com/473
"1. QuerydslPagingItemReader" 부분쪽을 보면, jojoldu님께서 이미 질문하신 내용이 나와있다.
결론부터 말하자면, JpaPagingItemReader와JpaItemWriter가 각각 다른 entityManager를 사용하고 있기 때문이다.
다시 처음부터 흐름을 디버깅해봅시다.
첫번째 청크에서, reader의 doReadPage()가 호출이 된다.
reader의 entityManager의 value 값은 SessionImpl(1931451243<open>)이다.
메서드가 끝나기 전, 잠깐 멈춰서 영속성컨텍스트(persistenceContext)를 보니
쿼리로 읽어들인 coupon id 5, 4에 해당하는 데이터가 올라와있음을 확인할 수 있다.
조금 건너뛰어서, processor를 거쳐 writer로 오면,
writer의 영속성컨텍스트에는 현재 coupon id 5, 4에 해당하는 데이터, 그리고 각각에 해당하는 coupon_history가 id 1, 2로 올라와있음을 확인할 수 있다.
그런데 여기서 문제는 entityManager의 value 값은 SessionImpl(493176261<open>)으로 reader와 각기 다른 엔티티 매니저를 사용하고 있다.
이게 그럼 무슨 문제인가?
다시 다음 청크를 진행해서 reader의 doReadPage()가 호출된다.
entityManager.flush()를 호출하기 전 잠깐 디버깅을 멈춰보자.
아직 reader의 엔티티 매니저에는, 처음 청크 reader가 호출될 때의 entityManager는 비워지지 않았으니 (clear()가 호출되지 않았음) coupon id 5, 4에 해당하는 객체가 올라가있다.
또한 첫번째 청크를 거치면서 coupon 객체 자체에 자식 엔티티 coupon_history 추가가 된 것을 확인할 수 있다.
그런데, 희한한 것은 각각의 histories의 size가 2이다.
추측컨데, 하나는 discard() 메서드를 호출할 때 실제 List에 객체 하나 add해서 생긴 것(영속화 x), 하나는 writer에서 자식인 histories도 영속화 하면서 실제 list에 객체 하나 더 추가한 것(영속화 o)일 것이라 생각한다.
어찌되었든 중요한 점은 지금 histories에는 이미 DB 에 저장된 영속화된 객체, 영속화 되지 않은 객체 두개가 있다는 점이다.
자 그럼 flush()가 호출되면 어떤일이 벌어지냐면,,,
컬렉션 더티 체킹이 일어나고,
id 가 null이었던, coupon_history들이 영속성컨텍스트에 올라가고, id 값을 채워넣기 위해 insert문이 생성되고 이는 쓰기지연 저장소에 쌓이게 된다.
이 상태에서 entityManager.clear()가 호출되면, PersistenceContext[entityKeys=[], collectionKeys=[]]로 영속성 컨텍스트는 깔끔하게 비워진다.
하지만, 영속성 컨텍스트가 비워진다고 해서 쓰기지연 저장소 SQL문도 함께 날아가는 것은 아니다!
때문에 영속성 컨텍스트가 관리하는 entityKeys는 비워졌지만, 아직 id를 채우기 위해 날라갔던 insert문은 쓰기 지연 저장소에 남아있는 것이다.
그래서...
보다싶이 이 Insert문은 doReadPage()가 끝날때, tx.commit()이 호출되면서 실제 DB에도 날라가게 되는 것이다.
그렇다면 맨 위에서 말한 예제에서, 해당 문제도 설명할 수 있을 것 같다.
... 마지막 청크는 5개 쿠폰에 대한 history는 정상적으로 한개씩만 쌓이고 있다.
이에 대한 원인은 다음 글의 다른 예제와 함께 원인을 기록해보려 한다.