개발 일지

Spring Webflux with EventListener

북극곰은콜라 2024. 3. 20. 20:33
반응형

 


개요

Spring framework에서 제공하는 eventListener 기능 설명 (4.2 이상 기준)
Webflux 환경에서 EventListener에 대한 Sample Code 제시

 


동작원리

Spring @EventListener

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Reflective
public @interface EventListener {
  @AliasFor("classes")
  Class<?>[] value() default {};

  @AliasFor("value")
  Class<?>[] classes() default {};

  String condition() default "";

  String id() default "";
}
Method를 target을 하는 Spring Annotation이다.
classes: handle Class 지정
condition: SqEL로 표현되는 조건 String
id: id부여
value == classes

ApplicationEventPublisher

@FunctionalInterface
public interface ApplicationEventPublisher {
  default void publishEvent(ApplicationEvent event) {
    this.publishEvent((Object)event);
  }

  void publishEvent(Object event);
}
Functional Interface로 event 발행을 위한 기능을 정의했다.
Spring은 해당 interface의 구현체를 Bean으로 관리하며, 내부 spring event publish 용으로 사용한다.
내부적으로 ApplicationEvent 객체를 통해 전파한다.

ApplicationContext

Spring Application이 기동 할 때 생성되는 Context는 기본적으로 ApplicationEventPublisher를 상속받는다.
그 외 EnvironmentCapable, ListableBeanFactory, HierarchicalBeanFactory, MessageSource, ResourcePatternResolver 등 interface도 상속받는다.

AbstractApplicationContext

@Override
public void publishEvent(ApplicationEvent event) {
	publishEvent(event, null);
}

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
	Assert.notNull(event, "Event must not be null");

	// Decorate event as an ApplicationEvent if necessary
	ApplicationEvent applicationEvent;
	if (event instanceof ApplicationEvent) {
		applicationEvent = (ApplicationEvent) event;
	}
	else {
		applicationEvent = new PayloadApplicationEvent<>(this, event);
		if (eventType == null) {
			eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
		}
	}

	// Multicast right now if possible - or lazily once the multicaster is initialized
	if (this.earlyApplicationEvents != null) {
		this.earlyApplicationEvents.add(applicationEvent);
	}
	else {
		getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
	}

	// Publish event via parent context as well...
	if (this.parent != null) {
		if (this.parent instanceof AbstractApplicationContext) {
			((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
		}
		else {
			this.parent.publishEvent(event);
		}
	}
}
EventPublisher의 publishEvent(…)를 구현한 abstract class이다.
4.2부터 ApplicationEvent를 상속받지 않아도 event를 publish 할 수 있다.
이는 object를 PayloadApplicationEvent로 event(Object)를 한번 감싸기 때문이다.
사용자가 publish한 이벤트는 보통 PayloadApplicationEvent로 multicater의 multicastEvent(…)에 호출된다.

ApplicationEventMulticaster

Application에서 구현된 Listener들을 execute 하는 Interface
일반적으로 SimpleApplicationEventMulticaster 구현체가 사용된다.

SimpleApplicationEventMulticaster

@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
	ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
	Executor executor = getTaskExecutor();
	for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
		if (executor != null) {
			executor.execute(() -> invokeListener(listener, event));
		}
		else {
			invokeListener(listener, event);
		}
	}
}
multicastEvent는 event에 맞는 ApplicationListener를 모두 순회하며 invoke 시킨다.
executor가 있으면 executor를 통해, 없으면 sync하게 invoke 된다.
executor는 AsyncConfigurer + @EnableAsync로 생성할 수 있다.
Listener는 Event class 및 resolver에 따라서 retrive 및 caching 된다.

 


Webflux Event Sample

문제상황

userService
  .getUser(req)
  .flatMap(userService::changePassword)
  .doOnNext(user -> 
    produceManager
      .send(user)
      .subscribeOn(Schedulers.boundedElastic())
      .subscribe()
  )
  ...
일반적으로 Webflux(Projectreactor)에서 별도의 Scheduler로 비동기처리를 위해서는
위와 같이 subscribeOn으로 scheduler를 지정하고 subscribe 별도로 호출한다.
하지만 business logic에 subscribe()를 호출하는 코드는 문제점을 가진다.
1. 가독성도 떨어진다.
2. 비동기로 실행되어, 의도되지 않은 코드에 대해 인지를 못할 수 있다.
3. 코드가 비대 해질수록 subscribe가 증가한다. 그에 따라 debug point가 증가하게 된다.

솔루션 제안

Spring Context의 EventListener를 Async 하게 사용한다.
이는 Event에 대한 Subscriber, Subscribe logic, Scheduler 등 비즈니스와 분리할 수 있다.
결론적으로 Event처리 비즈니스의 응집도를 높일 수 있다.

Event할당 처리를 위한 Async Executor 설정

@Configuration
@EnableAsync
public class Config implements AsyncConfigurer {
  @Override
  public Executor getAsyncExecutor() {
    return Executors.newFixedThreadPool(
        Math.max(1, Runtime.getRuntime().availableProcessors()),
        new DefaultThreadFactory("async-event"));
  }

  @Override
  public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return new SimpleAsyncUncaughtExceptionHandler();
  }
}
일반적인 processor 기반 fixed thread pool을 만든다.
해당 Executor는 listener를 통한 event의 invoke를 담당한다.

Event Handler 작성

@Component
public class EventHandler {
  private final WebClient webClient = WebClient.builder().build();

  @Async
  @EventListener
  public void handleEvent(final Event event) throws InterruptedException {
    // 비동기 처리 확인을 위한 임시 sleep
    Thread.sleep(1000);
    Mono.just(event.getPath())
        .doOnNext(path -> log.info("publish path: {}", path))
        .flatMap(path -> webClient
            .get()
            .uri("http://localhost:12345" + path)
            .retrieve()
            .bodyToMono(HashMap.class))
        .subscribe();
  }
}
Bean에 @Async, @EventListener를 통해 handler method를 작성한다.
Annotation 기반이기 때문에, 상대적으로 자연스럽게 handler method를 관리할 수 있다.

Business Example

@Service
@RequiredArgsConstructor
@Slf4j
public class BusinessService {
  private final ApplicationEventPublisher applicationEventPublisher;

  public void doSomething() {
    Mono.just(new Event("/1"))
        .doOnNext(applicationEventPublisher::publishEvent)
        .map(ev -> new Event("/2"))
        .doOnNext(applicationEventPublisher::publishEvent)
        .map(ev -> new Event("/3"))
        .doOnNext(applicationEventPublisher::publishEvent)
        .map(ev -> new Event("/4"))
        .doOnNext(applicationEventPublisher::publishEvent)
        .doOnNext(ev -> log.info("doSomething end"))
        .subscribe();
  }
}

Log

[           main] o.e.eventwebflux.send.BusinessService    : doSomething end
[async-event-3-3] o.e.eventwebflux.send.EventHandler       : publish path: /3
[async-event-3-1] o.e.eventwebflux.send.EventHandler       : publish path: /1
[async-event-3-2] o.e.eventwebflux.send.EventHandler       : publish path: /2
[async-event-3-4] o.e.eventwebflux.send.EventHandler       : publish path: /4
상대적으로 깔끔하게 비동기 Event 처리를 구현할 수 있다.
async executor에서 처리됨을 확인할 수 있다.

 


REFERENCE

https://github.com/spring-projects/spring-framework/issues/21025

https://github.com/spring-projects/spring-framework/issues/21831

 

 

반응형

'개발 일지' 카테고리의 다른 글

ksqlDB 란  (0) 2024.05.27
Java Reactive Streams Publisher / Subscriber 분석 (projectreactor)  (0) 2024.03.26
[Kafka] Parallel Consumer  (0) 2024.03.19
TURN Protocol (+ STUN Message)  (1) 2024.03.18
ProjectReactor Case Study  (0) 2024.02.22