Yebali

Spring event와 Transaction 그리고 @EventListerner, @TransactionalEventListener 본문

Spring

Spring event와 Transaction 그리고 @EventListerner, @TransactionalEventListener

예발이 2024. 11. 8. 18:17

Spring event란 스프링에 데이터를 전달하는 방식 중 하나로

메서드를 직접 호출하는 것이 아니라 이벤트를 발행(Publish)하고 수신(Listen)하는 형태로 사용됩니다.

 

이런 방식을 사용하면 모듈 간 의존성이 낮아지고 관심사를 분리할 수 있다는 단점이 있습니다.

 

구현하는 방법은 일반적으로 ApplicationEventPublisher를 통해 이벤트를 발행하고,

@EventListener, @TransactionalEventListener를 통해 이벤트를 수신합니다.

 

기본적인 사용방법은 이미 아시는 분들이 많을 거라 생각합니다.

이 글에서는 @EventListener, @TransactionalEventListener가 Transaction과 함께 이벤트를 처리할 때 어떻게 동작한는지에 대해 알아보겠습니다.

 

Transaction 추적을 위한 AOP 추가

@Aspect
@Component
class TransactionCommitAspect {
    @Around("@annotation(org.springframework.transaction.annotation.Transactional)")
    fun aroundTransactionalMethod(joinPoint: ProceedingJoinPoint): Any? {
        val transactionId = UUID.randomUUID()

        println("[TransactionCommitAspect] Thread Name: ${Thread.currentThread().name}-${Thread.currentThread().threadId()}")
        println("[TransactionCommitAspect] Transaction Start. Transaction ID: $transactionId")

        TransactionSynchronizationManager.registerSynchronization(object : TransactionSynchronization {
            override fun afterCommit() {
                println("[TransactionCommitAspect] Transaction committed successfully. $transactionId")
            }

            override fun afterCompletion(status: Int) {
                println("[TransactionCommitAspect] Transaction completed. $transactionId")
            }
        })

        return joinPoint.proceed()
    }
}

먼저 스레드 정보와 Transaction 정보를 로깅할 수 있도록 했습니다.

스레드는 'Thread.currentThread().threadId()'와 같은 방법으로 정보를 가져올 수 있지만,
Transaction은 구분할 수 있는 유니크한 값을 찾지 못해 AOP를 사용하여 Transaction 시작 전, Commit 후

임의의 Id(transactionId)를  로그를 남기도로 했습니다.

 

참고로 afterCommit()은 Transaction이 성공적으로 commit 되었을 때 호출되는 함수이고, 

afterCompletion()은 commit/rollback 여부에 관계없이 Transaction이 종료되면 호출되는 함수입니다.

 

@EventListener, @TransactionalEventListener 추가

@Component
class SpringEventHandler {
    @EventListener
    fun handleByEventListener(event: EventListenerEvent) {
        println("[EventListener] Thread Name: ${Thread.currentThread().name}-${Thread.currentThread().threadId()}")
        println("[EventListener] Error Occurred")
        throw RuntimeException()
    }

    @Async
    @EventListener
    fun handleByAsyncEventListener(event: AsyncEventListenerEvent) {
        println("[AsyncEventListener] Thread Name: ${Thread.currentThread().name}-${Thread.currentThread().threadId()}")
        println("[AsyncEventListener] Error Occurred")
        throw RuntimeException()
    }

    @TransactionalEventListener(phase = BEFORE_COMMIT)
    fun handleByTransactionalEventListenerBeforeCommit(event: TransactionalEventListenerBeforeCommitEvent) {
        println("[TransactionalEventListenerBeforeCommit] Thread Name: ${Thread.currentThread().name}-${Thread.currentThread().threadId()}")
        println("[TransactionalEventListenerBeforeCommit] Error Occurred")
        throw RuntimeException()
    }

    @Async
    @TransactionalEventListener(phase = BEFORE_COMMIT)
    fun handleByAsyncTransactionalEventListenerBeforeCommit(event: AsyncTransactionalEventListenerBeforeCommitEvent) {
        println("[AsyncTransactionalEventListenerBeforeCommit] Thread Name: ${Thread.currentThread().name}-${Thread.currentThread().threadId()}")
        println("[AsyncTransactionalEventListenerBeforeCommit] Error Occurred")
        throw RuntimeException()
    }

    @TransactionalEventListener(phase = AFTER_COMMIT)
    fun handleByTransactionalEventListenerAfterCommit(event: TransactionalEventListenerAfterCommitEvent) {
        println("[TransactionalEventListenerAfterCommit] Thread Name: ${Thread.currentThread().name}-${Thread.currentThread().threadId()}")
        println("[TransactionalEventListenerAfterCommit] Error Occurred")
        throw RuntimeException()
    }
}

테스트를 위해 상황별 이벤트 리스너들을 추가해 주었습니다.

리스너들은 모두 동일하게 스레드 정보를 출력하고 Transacntion이 롤백될 수 있게 RuntimeException 예외를 발생시키도록 했습니다.

 

테스트를 위한 @Service 추가

@Service
class SpringEventService(
    private val teamRepository: TeamRepository,
    private val applicationEventPublisher: ApplicationEventPublisher,
) {
    @Transactional
    fun publish(isTransactionalListener: Boolean, isBeforeCommit: Boolean, isAsync: Boolean) {
        println("[Publish] Thread Name: ${Thread.currentThread().name}-${Thread.currentThread().threadId()}")

        // Create a new team
        teamRepository.save(
            Team(name = "New Team"),
        )

        when {
            !isTransactionalListener && !isAsync -> applicationEventPublisher.publishEvent(
                EventListenerEvent(),
            )
            !isTransactionalListener && isAsync -> applicationEventPublisher.publishEvent(
                AsyncEventListenerEvent(),
            )
            isBeforeCommit && !isAsync -> applicationEventPublisher.publishEvent(
                TransactionalEventListenerBeforeCommitEvent(),
            )
            isBeforeCommit && isAsync -> applicationEventPublisher.publishEvent(
                AsyncTransactionalEventListenerBeforeCommitEvent(),
            )
            !isBeforeCommit -> applicationEventPublisher.publishEvent(
                TransactionalEventListenerAfterCommitEvent(),
            )

            else -> {}
        }
    }

    fun publishWithoutTx() {
        println("[PublishWithoutTx] Thread Name: ${Thread.currentThread().name}-${Thread.currentThread().threadId()}")

        // Create a new team
        teamRepository.save(
            Team(name = "New Team"),
        )

        applicationEventPublisher.publishEvent(
            EventListenerEvent(),
        )
    }
}

테스트를 위한 @Service를 추가해 주었습니다.

검증을 위해 @Service에서 'Team' 엔티티를 만들어 저장하고, 파라미터에 따라 각각의 이벤트 리스너들이 사용되도록 했습니다.

@Service에서 시작된 Transaciton에 리스너가 참여하면 예외로 인해 'Team'엔티티는 롤백으로 인해 생성되지 못하고

Transaciton에 참여하지 않았다면 'Team'엔티티가 생성되는 결과를 확인할 수 있도록 설계했습니다.

 

'publish()' 메서드는 테스트하고자 하는 이벤트 핸들러가 사용될 수 있도록 아래 세 가지 파라미터를 받아 Event를 발행합니다.

  • isTransacntionalEventListerner: @EventListener or @TransactionalEventListener 여부
  • isBeforeCommit: TransactionPhase.BeforeCommit or TransactionPhase.AfterCommit 여부
  • isAsync: 비동기 실행 여부

'publishWithoutTx()' 메서드는 Transaction이 없는 테스트를 하기 위해 만든 메서드입니다.

이름에서 알 수 있듯 @TransacntionalEventListener의 경우 Transaction이 있어야만 사용할 수 있는 리스너이기 때문에

해당 메서드는 @EventListener만을 테스트하기 위한 메서입니다.

 

테스트 및 결과

@SpringBootTest
@ActiveProfiles("test")
class SpringEventServiceTest {
    @Autowired
    private lateinit var springEventService: SpringEventService

    @Autowired
    private lateinit var teamRepository: TeamRepository

    @BeforeEach
    fun init() {
        teamRepository.deleteAll()
    }

    @Nested
    // @EventListener는 기본적으로는 Tx에 참여하지 않고 동기적으로 바로 호출된다.
    inner class EventListenerTest {
        @Test
        fun `Transaction 밖에서 호출된 @EventListener은 Transacntion에 참여하지 않는다`() {
            runCatching { springEventService.publishWithoutTx() }

            // Tx에 참여하지 않았기 때문에 team이 생성된다.
            Assertions.assertThat(teamRepository.findAll()).isNotEmpty()
        }

        @Test
        fun `Transaction 내에서 호출된 @EventListener은 Transaction에 참여한다`() {
            runCatching { springEventService.publish(isTransactionalListener = false, isBeforeCommit = false, isAsync = false) }

            // Tx 안에서 이벤트를 발행하면 Tx에 참여하게 된다. 따라서 team은 생성되지 않는다.
            Assertions.assertThat(teamRepository.findAll()).isEmpty()
        }

        @Test
        fun `Transaction 내에서 호출된 @Async + @EventListener은 Transaction에 참여하지 않는다`() {
            runCatching { springEventService.publish(isTransactionalListener = false, isBeforeCommit = false, isAsync = true) }

            // Tx은 Thread 종속적이기 떄문에, @Async로 인해 이벤트 핸들링이 다른 스레드에서 일어나 Tx에 참여하지 못한다.
            // 그렇기 때문에 team은 생성된다.
            Assertions.assertThat(teamRepository.findAll()).isNotEmpty()
        }
    }

    @Nested
    inner class TransactionEventListenerTestBeforeCommit {
        @Test
        fun `@TransactionalEventListener(phase = BEFORE_COMMIT)은 Transaction에 참여한다`() {
            runCatching { springEventService.publish(isTransactionalListener = true, isBeforeCommit = true, isAsync = false) }

            // Tx에 참여했기 때문에 team은 생성되지 못한다.
            Assertions.assertThat(teamRepository.findAll()).isEmpty()
        }

        @Test
        fun `@Async + @TransactionalEventListener(phase = BEFORE_COMMIT)은 Transaction에 참여하지 않는다`() {
            runCatching { springEventService.publish(isTransactionalListener = true, isBeforeCommit = true, isAsync = true) }

            // Tx은 Thread 종속적이기 떄문에, @Async로 인해 이벤트 핸들링이 다른 스레드에서 일어나 Tx에 참여하지 못한다.
            // 그렇기 때문에 team은 생성된다.
            Assertions.assertThat(teamRepository.findAll()).isNotEmpty()
        }
    }

    @Nested
    inner class TransactionEventListenerTestAfterCommit {
        @Test
        fun `@TransactionalEventListener(phase = AFTER_COMMIT)은 Transacntion에 참여하지 않는다`() {
            runCatching { springEventService.publish(isTransactionalListener = true, isBeforeCommit = false, isAsync = false) }

            // Tx에 참여하지 않았기 때문에 team은 생성된다.
            Assertions.assertThat(teamRepository.findAll()).isNotEmpty()
        }
    }
}

위와 같은 테스트 코드로 테스트를 진행했습니다.

테스트는 아래 6가지를 테스트했습니다. 위 코드들은 모두 테스트를 통과한 코드입니다.

  • Transaction 밖에서 @EventListener를 사용했을 때 Transaction 참여 여부
  • Transaction 내에서 @EventListener를 사용했을 때 Transaction 참여 여부
  • Transaction 내에서 비동기로 @EventListener를 사용했을 때 Transaction 참여 여부
  • @TransactionalEventListener(phase = BEFORE_COMMIT)의 Transaction 참여 여부
  • 비동기 + @TransactionalEventListener(phase = BEFORE_COMMIT)의 Transaction 참여 여부
  • @TransactionalEventListener(phase =  AFTER_COMMIT)의 Transaction 참여 여부

 

이제 코드와 로그를 하나하나 살펴보겠습니다.

 

Transaction 밖에서 @EventListener를 사용했을 때 Transaction 참여 여부

@Test
fun `Transaction 밖에서 호출된 @EventListener은 Transacntion에 참여하지 않는다`() {
    runCatching { springEventService.publishWithoutTx() }

    // Tx에 참여하지 않았기 때문에 team이 생성된다.
    Assertions.assertThat(teamRepository.findAll()).isNotEmpty()
}

@EventListener는 기본적으로 Transaction에 참여하지 않습니다.

심지어 Transaction 밖에서 호출되었기 때문에 'publishWithoutTx()'에서 생성하는 'Team'엔티티엔 아무런 영향을 미치지 않습니다.

'publishWithoutTx()' 메서드는 @Transacntional 어노테이션도 사용하지 않았기 때문에 AOP 또한 동작하지 않음을 볼 수 있습니다.

 

즉, Transaction 밖에서 사용되는 @EventListener는 Transaction에 참여하지 않고 동일한 스레드에서 실행됩니다.

 

Transaction 내에서 @EventListener를 사용했을 때 Transaction 참여 여부

@Test
fun `Transaction 내에서 호출된 @EventListener은 Transaction에 참여한다`() {
    runCatching { springEventService.publish(isTransactionalListener = false, isBeforeCommit = false, isAsync = false) }

    // Tx 안에서 이벤트를 발행하면 Tx에 참여하게 된다. 따라서 team은 생성되지 않는다.
    Assertions.assertThat(teamRepository.findAll()).isEmpty()
}

@EventListener 기본적으로 Transaction에 참여하지 않지만 Transaction 내에서 리스너가 동작하면 Transaction에 참여합니다.

이것은 이벤트가 발행되면 동기적으로 즉시 해당 리스너가 동작한다는 특징 때문입니다.

로그에서 확인되는 것처럼 동일한 스레드와 Transacntion에서 리스너가 동작했고

Team을 생성하고 커밋하기 전에 예외가 발생한 것을 확인할 수 있습니다. 그렇기 때문에 'Team' 엔티티는 생성되지 않았습니다.

 

즉, Transaction 안에서 사용되는 @EventListener는 Transacntion에 참여하고 동일한 스레드에서 실행됩니다.

 

Transaction 내에서 비동기로 @EventListener를 사용했을 때 Transaction 참여 여부

@Test
fun `Transaction 내에서 호출된 @Async + @EventListener은 Transaction에 참여하지 않는다`() {
    runCatching { springEventService.publish(isTransactionalListener = false, isBeforeCommit = false, isAsync = true) }

    // Tx은 Thread 종속적이기 떄문에, @Async로 인해 이벤트 핸들링이 다른 스레드에서 일어나 Tx에 참여하지 못한다.
    // 그렇기 때문에 team은 생성된다.
    Assertions.assertThat(teamRepository.findAll()).isNotEmpty()
}

Spring의 Transaction의 connection은 ThreadLocal에서 관리됩니다.

즉, 코드가 동작하면 스레드가 달라지면 Transaction에 참여할 수 없다는 의미입니다.

위 테스트에서 @EventListener가 Transacntion 내에서 실행됐지만,

'publish()'를 호출한 'Test worker-1' 스레드와는 다른 'task-1-43' 스레드에서 실행되는 것을 확인할 수 있습니다.

그렇기 때문에 예외는 발생했지만 'Team' 엔티티는 생성되었습니다.

 

즉, Transaction 안에서 사용되는 @EventListener가 비동기로 다른 스레드에서 동작하면 Transaction에 참여하지 않습니다.

 

@TransactionalEventListener(phase = BEFORE_COMMIT)의 Transaction 참여 여부

@Test
fun `@TransactionalEventListener(phase = BEFORE_COMMIT)은 Transaction에 참여한다`() {
    runCatching { springEventService.publish(isTransactionalListener = true, isBeforeCommit = true, isAsync = false) }

    // Tx에 참여했기 때문에 team은 생성되지 못한다.
    Assertions.assertThat(teamRepository.findAll()).isEmpty()
}

 

@TransacntionalEventLister는 Transacntion동작에 따라 리스너의 실행 시점을 선택할 수 있는 어노테이션입니다.

'phase = TransactionPhase.BEFORE_COMMIT'인 경우 Transaction이 Commit 되기 전에 리스너가 동작합니다.

로그에서 확인되는 것처럼 동일한 스레드와 Transacntion에서 리스너가 동작했고

Team을 생성하고 커밋하기 전에 예외가 발생한 것을 확인할 수 있습니다. 그렇기 때문에 'Team' 엔티티는 생성되지 않았습니다.

 

즉, @TransacntionalEventListner(phase = TransactionPhase.BEFORE_COMMIT)는

Transaction에 참여하고 동일한 스레드에서 실행됩니다.

 

비동기 + @TransactionalEventListener(phase = BEFORE_COMMIT)의 Transaction 참여 여부

@Test
fun `@Async + @TransactionalEventListener(phase = BEFORE_COMMIT)은 Transaction에 참여하지 않는다`() {
    runCatching { springEventService.publish(isTransactionalListener = true, isBeforeCommit = true, isAsync = true) }

    // Tx은 Thread 종속적이기 떄문에, @Async로 인해 이벤트 핸들링이 다른 스레드에서 일어나 Tx에 참여하지 못한다.
    // 그렇기 때문에 team은 생성된다.
    Assertions.assertThat(teamRepository.findAll()).isNotEmpty()
}

위에 설명한 것처럼 비동기로 실행되면 스레드가 달라져 Transaction에 참여할 수 없게 됩니다

로그에서 확인되는 것처럼 'publish()'를 호출한 'Test worker-1' 스레드와는 다른 'task-2-44' 스레드에서 실행되는 것을 확인할 수 있습니다.

그렇기 때문에 'Team' 엔티티는 생성되지 않았습니다.

 

즉, @TransacntionalEventListner(phase = TransactionPhase.BEFORE_COMMIT)가 

비동기로 다른 스레드에서 동작하면 Transaction에 참여하지 않습니다.

 

@TransactionalEventListener(phase =  AFTER_COMMIT)의 Transaction 참여 여부

@Test
fun `@TransactionalEventListener(phase = AFTER_COMMIT)은 Transacntion에 참여하지 않는다`() {
    runCatching { springEventService.publish(isTransactionalListener = true, isBeforeCommit = false, isAsync = false) }

    // Tx에 참여하지 않았기 때문에 team은 생성된다.
    Assertions.assertThat(teamRepository.findAll()).isNotEmpty()
}

'phase = TransactionPhase.BEFORE_COMMIT'인 경우 Transaction이 Commit 된 후에 리스너가 동작합니다.

로그에서 확인되는 것처럼 Transacntion 종료된 후 리스너가 동작합니다.

그렇기 때문에 'Team'엔티티는 생성됩니다.

 

즉, @TransacntionalEventListner(phase = TransactionPhase.BEFORE_COMMIT)는 Transaction에 참여하지 않습니다.

 

마무리

역시 궁금한건 직접 해보는 게 가장 좋은 방법인 것 같습니다.

감사합니다.