본문 바로가기

스프링 부트/JPA

[Spring Batch] JpaPagingItemReader + JpaItemWriter를 사용했 때겪었던 문제점 (2)

(수정중)
다시한번 언급하지만, 이 글의 전제는 '애초에 이렇게 배치를 구현하면 안된다'로,

JpaPagingItemReader, JpaItemWriter를 가지고 이것 저것 실험하면서 경험한 내용을 정리한 글이다.
앞 글에서 아직 다 다루지 못한 문제가 남아있다. 바로 "마지막 청크는 5개 쿠폰에 대한 history는 정상적으로 한개씩만 쌓이고 있다" 라는 것이다.
(어느정도 감이 왔을 수도 있지만...) 이에 대한 원인은 다른 예제로 설명해보려 한다.

JpaItemWriter에서 Dirty Checking으로 엔티티 변경사항을 DB에 반영할 때 문제점

앞 글에서 발생한 자식엔티티 중복 저장에 대한 문제는 결국 reader와 writer에서 서로 다른 entityManager를 사용하고 있었기 때문이었다. writer에서 em.merge()로 영속화 시킨 자식 엔티티를 다음 청크가 진행될 때 reader에서 또 영속화를 시키니 중복으로 저장되었던 것.
그러면 다른 방법을 (실험삼아) 시도해보자.
엔티티 매니저는 reader의 것만 사용하고, (processor를 제거하고) writer에서 discard()를 호출하고 트랜잭션이 끝날 때 dirty checking이 발생하는 방법으로 해당 배치를 구현하면 어떨까?

@Configuration
class CouponDiscardBatchConfiguration(
    val jobBuilderFactory: JobBuilderFactory,
    val stepBuilderFactory: StepBuilderFactory,
    val entityManagerFactory: EntityManagerFactory,
    @Value("\${chunkSize:$DEFAULT_CHUNK_SIZE}")
    val chunkSize: Int
) {
    private val log = logger()

    @Bean(name = [JOB_NAME])
    fun job(): Job =
        jobBuilderFactory.get(JOB_NAME)
            .start(discardVouchersStep())
            .build()

    @Bean(name = [DISCARD_VOUCHERS_STEP_NAME])
    fun discardVouchersStep(): Step =
        stepBuilderFactory.get(DISCARD_VOUCHERS_STEP_NAME)
            .chunk<Coupon, Coupon>(chunkSize)
            .reader(reader())
            .writer(writer())
            .build()

    @StepScope
    @Bean(name = ["$DISCARD_VOUCHERS_STEP_NAME-reader"])
    fun reader(): JpaPagingItemReader<Coupon> =
        JpaPagingItemReaderBuilder<Coupon>()
            .name("reader")
            .entityManagerFactory(entityManagerFactory)
            .pageSize(chunkSize)
            .queryString("SELECT c FROM Coupon c order by id desc")
            .build()
    
    @Bean
    @StepScope
    fun writer(): ItemWriter<Coupon> =
        ItemWriter { items ->
            items.forEach {
                it.discard()
            }
        }

    companion object {
        const val DEFAULT_CHUNK_SIZE = 10
        const val JOB_NAME = "couponDiscardJob"
        const val DISCARD_VOUCHERS_STEP_NAME = "$JOB_NAME-discardCouponsStep"
    }
}

proccessor에서 상태변경을 하던 discard() 로직을 writer에서 호출하도록 변경해보았다.
그리고 다시 배치를 돌려보면,
(참고로 이전 글과 동일하게 쿠폰 데이터 5개, chunkSize는 2, pageSize도 동일하게 2이다.)

coupon, coupon_history

페이징 쿼리에서 id desc로 정렬하기 때문에 마지막 페이지에 해당하는 id 1에 대해서만 작업이 이루어 질 것이다.
그런데 마지막 페이지, 청크에서 읽어들인 Id 1 쿠폰에 대한 변경사항이 DB에 반영되지 않은 것을 확인할 수 있다.

다시 디버깅을 해보면서 알아봅시다.

먼저 첫번째 첫번째 페이지(첫번째 청크)에서 doRead 메서드가 끝날 시점이다.
영속성 컨텍스트에는 페이지 사이즈만큼 읽어들인 쿠폰 5, 4에 대한 데이터가 올라가 있음을 확인할 수 있다.

itemWriter가 2번 호출된 뒤 쿠폰의 상태가 바뀐 것을 확인할 수 있다.
하지만 아직 콘솔에는 어떠한 insert, update 쿼리도 발생하지 않았다.

아직 위와 같이 select 쿼리만이 존재한다.
다음 페이지, 청크로 넘어가보자.

참고로 두번째 청크에서 Reader의 영속성 컨텍스트의 상태는 위와 같다.
discard()로 인스턴스의 상태를 변경한 것이 반영된 것을 볼 수 있다. 조금 진행을 해보고,

 

entityManager.flush()가 호출된 이 시점, 여기서 자식 엔티티인 history에 대한 insert 문, 쿠폰에 대한 update문이 발생하는 것을 볼 수 있다.
이 쿼리들은 쓰기 지연 SQL 저장소에 쌓이게 되고, doReadPage()에서 커밋이 호출되는 시점에 DB에 쿼리가 날라가서 반영이 된다.

		} else {
			results.addAll(query.getResultList());
			tx.commit(); // 여기서 실제 DB에 쿼리가 날라감
		}//end if


여기서 그럼 알 수 있는 것은 dirty checking으로 엔티티의 변경사항을 반영하고자 이렇게 구현할 경우,
실제 우리가 원하는 엔티티의 변경사항은 해당 청크에서 반영되는 것이 아니라, 다음 청크(페이지)에서 반영이 된다.
그렇다면 마지막 페이지의 변경사항은 그 다음 청크가 없고, 그렇다면 reader에서의 flush가 호출되지 않기 때문에 DB에 변경사항이 반영되지 않는 것이다.

해결 방법

이를 해결하는 방법은 여러가지가 있겠지만, 나는 여기서 얻은 몇가지 깨달음과 함께 다음과 같은 방법으로 구현하였다. 
청크지향의 배치 step을 구현할 경우,

  • reader에서 읽어온 엔티티의 상태를 변경하지 않는다. 
  • reader에서는 id만 읽어오고 (혹은 dto), 이 id를 가지고 writer에서 @Transactional이 달린 Service(클래스 메서드)를 호출하여 원하고자 하는 상태변경을 진행한다. 
  • 여기서, 배치의 트랜잭션과는 별개로, 서비스의 트랜잭션을 따로 사용하기 위해 다음과 같이 배치의 트랜잭션 전파 옵션을 꺼버린다. 
@Bean(name = [DISCARD_VOUCHERS_STEP_NAME])
fun discardVouchersStep(): Step =
    stepBuilderFactory.get(DISCARD_VOUCHERS_STEP_NAME)
        .chunk<Coupon, Coupon>(chunkSize)
        .reader(reader())
        .writer(writer())
        .transactionAttribute(DefaultTransactionAttribute(TransactionDefinition.PROPAGATION_NOT_SUPPORTED)) // 현재 트랜잭션을 지원하지 않고 항상 비 트랜잭션으로 실행합니다.
        .build()

➕ 근데 왜 JpaPagingItemReader에서 트랜잭션을 새로 열고, flush(), clear()를 하는거야

의문을 가진 채로 JpaPagingItemReader를 보다보니, 주석에 어느정도 이유를 적어놓았었다. 직접 공식 문서로 가서 그 내용을 긁어왔다.

공식 문서를 봅시다

https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/database/JpaPagingItemReader.html

 

JpaPagingItemReader (Spring Batch 4.3.6 API)

ItemReader for reading database records built on top of JPA. It executes the JPQL setQueryString(String) to retrieve requested data. The query is executed using paged requests of a size specified in AbstractPagingItemReader.setPageSize(int). Additional pag

docs.spring.io

// ...
In order to reduce the memory usage for large results the persistence context is flushed and cleared after each page is read. This causes any entities read to be detached. If you make changes to the entities and want the changes persisted then you must explicitly merge the entities.
The reader must be configured with an EntityManagerFactory. All entity access is performed within a new transaction, independent of any existing Spring managed transactions.
The implementation is thread-safe in between calls 
// ...

조금 정리를 해보자면,
Jpa의 페이징 동작 방식은 페이지 크기에 해당되는 데이터만을 조회하는 쿼리가 직접 나가는 것이 아니라,
조건에 해당하는 모든 데이터들을 애플리케이션에 올리고, 애플리케이션 단에서 페이징 처리를 진행한다. 
문서에 의하면 이렇게 많은 데이터들을 메모리에 올리며 드는 비용을 줄이기 위해 영속성 컨텍스트는 각 페이지를 읽은 후, 
flush, clear를 호출한다고 한다. 
(개인적인 생각...)
음 메모리에 모두 올린 데이터들을 clear해준다까지는 이해가 가지만, 왜 그 전에 flush를 호출하는지는 아직 의문이다. 
영속성컨텍스트를 clear하기 전에 혹시나 하는 변경사항을 DB에 반영하기 위해서라고 치자. 
그럼 마지막 페이지를 읽고는 다시 reader 호출이 없을 텐데, 이 때 flush가 호출 되지 않으면 마지막 청크의 변경사항만 DB 에 반영되지 않을 수 있다는 점을 염두해둔건지도 궁금하다. 

+ 추가로 생각해보니 애초에 JpaPagingItemReader에서는 chunk 사이즈와는 별개로 pageSize(페이지사이즈)를 지정하고, 지정한 페이지 사이즈만큼만 읽어온다. 즉, 페이지 사이즈는 청크사이즈와는 아무런 연관이 없다는 것이다.
ex) 만약 pageSize=4, chunkSize=2라면, 리더에서 4개의 데이터를 읽어오고 총 2번의 청크가 실행된다. 

이렇게 청크와 페이지라는 것이 독립적인 개념이므로 reader의 트랜잭션이 스프링 배치의 청크와 독립적으로 사용한 것으로 보인다. 우리가 배치에서 청크사이즈와 페이지를 다르게 설정하였을 때 일어날 수 있는 문제점 때문에 같은 값을 사용한 것 뿐이지 이 둘은 독립적인 트랜잭션을 가지고 있는 것이다. 

 추가로 이와 비슷한 문제로 stack overflow에 올라온 몇가지 글들을 남겨놓는다.

https://stackoverflow.com/questions/26509971/spring-batch-jpapagingitemreader-why-some-rows-are-not-read

 

Spring batch jpaPagingItemReader why some rows are not read?

I 'm using Spring Batch(3.0.1.RELEASE) / JPA and an HSQLBD server database. I need to browse an entire table (using paging) and update items (one by one). So I used a jpaPagingItemReader. But when...

stackoverflow.com

https://stackoverflow.com/questions/68315537/spring-batch-transacted-jpapagingitemreader-enitymanager-flush-before-read

 

Spring Batch transacted JpaPagingItemReader enityManager.flush() before read

I have a standard bidirectional parent child relation ship as follows: @Entity public class Parent { @Id private Long id; private String field1; private String field2; @One...

stackoverflow.com