개발 일지

Spring Webflux Threading (publishOn, subscribeOn) 분석

북극곰은콜라 2023. 10. 30. 19:56
반응형


개요

Reactor에서 제공하는 Pub / Sub 모델은 실행될 Thread를 전략적으로 선택할 수 있게 interface를 제공한다.
 - publishOn(Scheduler scheduler): 어디에서 Publish 될 것인가
 - subscribeOn(Scheduler scheduler): 어디에서 subscribe 할 것인가

 


Scheduler

interface Scheduler extends Disposable {
    
  Disposable schedule(Runnable task);
  Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
  Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  
  long now(TimeUnit unit);
  
  Worker createWorker();
  
  interface Worker extends Disposable {
    Disposable schedule(Runnable task);
    Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
    Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  }
}
Scheduler는 사용자에게 threading을 효율적으로 제어할 수 있도록 설계된 interface이다.
Worker는 Scheduler에 의해 create되며, 역할은 task를 execute 하는 것이다.
일반적으로 구현체는 각각 용도에 맞는 ExecutorService 또는 그 reference를 가지고 있으며
해당 executorService를 활용하여 Thread를 구현로직에 따라 Execute한다. (일반적으로 스케쥴링을 위한 ScheduledExecutorService를 가지고 있다.)

Default Schedulers

Project Reactor에서는 기 구현된 클래스들을 제공한다.
이는 Schedulers 클래스의 static 메서드로 제공 받을 수 있다.

주요 Scheduler

ImmediateScheduler:
 - Thread 변경 없는 scheduler → ExecutorService를 쓰지 않는다.
 - interface상 scheduler가 notNull일 시 사용
singleScheduler:
 - Thread 하나 짜리 executorService에서 돌아가는 scheduler, 한번에 하나만 실행될 작업에 적합
ParallelScheduler:
 - Parall한 작업을 수행하기 위한 scheduler, 다수의 작업을 짧게 실행하기에 적합하다.
 - I/O등 작업을 수행하기 부적합. (parall Thread는 기본적으로 CPU 갯수)
 - 기본적인 Reactor 연산의 기본값 (reactor-http-nio 등)
BoundedElasticScheduler:
 - ExecutorService에서 Worker를 생성하고, 재사용
 - I/O 작업 등 시간이 걸리는 작업을 수행하기 적합
 - worker는 1분 이상 idle 되면 제거
ExecutorScheduler:
 - custom executor를 세팅할 수 있는 scheduler
 - 개발자는 반드시 executorService로 lifeCycle 관리를 해야 함

 


Operator: PublishOn(), SubscribeOn()

Reactor에서 Scheduler를 적용할 수 있게 제공하는 method 이다.
 - publishOn(): Source로부터 이하 로직이 실행 될 Scheduler를 지정할 수 있는 메서드 subscribeOn(): Source부터   - Scheduler가 지정되지 않은 upStream의 시작점부터 Scheduler를 적용할 수 있는 메서드

PublishOn()

public static void main(String[] args) {
  Mono.just("pubEx")
      .doOnNext(body -> log.info("1"))
      .publishOn(Schedulers.newSingle("single1"))
      .doOnNext(body -> log.info("2"))
      .publishOn(Schedulers.parallel())
      .doOnNext(body -> log.info("3"))
      .publishOn(Schedulers.boundedElastic())
      .doOnNext(body -> log.info("4"))
      .subscribe();
}

09:47:37.881 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 1
09:47:37.886 [single1-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 2
09:47:37.887 [parallel-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 3
09:47:37.888 [boundedElastic-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 4
최초 실행 쓰레드는 main으로 1번 로그는 main Thread에서 실행된다.
2, 3, 4는 각각 doOnNext 전 Scheduler를 지정하여 실행된 것을 확인할 수 있다.

SubscribeOn()

public static void main(String[] args) {
  Mono.just("subEx")
      .doOnNext(body -> log.info("1"))
      .doOnNext(body -> log.info("2"))
      .subscribeOn(Schedulers.newSingle("single1"))
      .doOnNext(body -> log.info("3"))
      .subscribeOn(Schedulers.parallel())
      .doOnNext(body -> log.info("4"))
      .subscribeOn(Schedulers.boundedElastic())
      .subscribe();
}

10:18:58.911 [single1-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 1
10:18:58.914 [single1-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 2
10:18:58.914 [single1-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 3
10:18:58.914 [single1-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 4
SubscribeOn은 PublishOn과 다르게, Scheduler가 적용되지 않은 upStream에 Scheduler를 등록한다.
따라서 최초의 newSingle() 스케쥴러는 적용되었지만, 이하 parallel, boundedElastic은 적용되지 못 했다.
자세한 사항은 후술
public Mono<ServerResponse> test(final ServerRequest serverRequest) {
  return serverRequest.bodyToMono(HashMap.class)
      .flatMap(x -> Mono.just(x)
          .subscribeOn(Schedulers.boundedElastic())
          .doOnNext(body -> log.info(Thread.currentThread().getName()))
          .doOnNext(body -> log.info("1"))
          .doOnNext(body -> log.info("2")))
      .flatMap(x -> ServerResponse.ok().build());
}

2023-10-26T14:04:40.575+09:00  INFO 10540 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter      : boundedElastic-1
2023-10-26T14:04:40.575+09:00  INFO 10540 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter      : 1
2023-10-26T14:04:40.575+09:00  INFO 10540 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter      : 2
scheduler 적용 예시

Scheduler 적용/미적용 비교

public static void main(String[] args) {
    Flux.fromIterable(List.of("1","2","3","4"))
        .doOnNext(TestRouter::wait1000)
        .subscribe();
    Flux.fromIterable(List.of("A","B","C","D"))
        .doOnNext(TestRouter::wait1000)
        .subscribe();
  }

10:00:06.700 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 1
10:00:07.712 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 2
10:00:08.720 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 3
10:00:09.726 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 4
10:00:10.737 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- A
10:00:11.741 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- B
10:00:12.752 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- C
10:00:13.765 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- D
1초씩 딜레이를 줘서 로깅하는 로직이 실행 된다.
1번 Subscribe가 종료되어야, 2번 subscribe가 진행되는 것을 확인할 수 있다.
이는 내부 로직이 main Thread에서 진행되기 때문에, main이 blocking으로 동작하여, 1번 subscribe line이 끝나야 2번 subscribe line이 실행되기 때문
public static void main(String[] args) {
    Flux.fromIterable(List.of("1","2","3","4"))
        .publishOn(Schedulers.boundedElastic())
        .doOnNext(TestRouter::wait1000)
        .subscribe();
    Flux.fromIterable(List.of("A","B","C","D"))
        .publishOn(Schedulers.boundedElastic())
        .doOnNext(TestRouter::wait1000)
        .subscribe();
  }

10:02:43.690 [boundedElastic-2] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- A
10:02:43.690 [boundedElastic-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 1
10:02:44.704 [boundedElastic-2] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- B
10:02:44.704 [boundedElastic-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 2
10:02:45.714 [boundedElastic-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 3
10:02:45.714 [boundedElastic-2] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- C
10:02:46.727 [boundedElastic-2] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- D
10:02:46.727 [boundedElastic-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 4
Scheduler 적용 시 이하 프로세스는 Scheduler에서 관리되며 main Thread의 경우 다음 line으로 넘어간다.
Scheduler는 각각의 Thread에서 로직을 executorService를 통해서 실행하며, 서로 간섭하지 않는다.

PublishOn() VS SubscribeOn()

 
public static void main(String[] args) {
  Mono.just("pub vs sub")
      .doOnNext(body -> log.info("1"))
      .publishOn(Schedulers.newSingle("pub"))
      .doOnNext(body -> log.info("2"))
      .publishOn(Schedulers.parallel())
      .doOnNext(body -> log.info("3"))
      .subscribeOn(Schedulers.newSingle("sub"))
      .doOnNext(body -> log.info("4"))
      .subscribe();
}

10:28:30.634 [sub-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 1
10:28:30.638 [pub-2] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 2
10:28:30.638 [parallel-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 3
10:28:30.638 [parallel-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 4
1번 로직은 본래 main 쓰레드에서 execute 했었지만 .subscribeOn(Schedulers.newSingle("sub")) 으로 인하여 sub(single)에서 execute되었다.
2번은 .publishOn(Schedulers.newSingle("pub")) 으로 pub(single)에서 실행 되었으며
3,4번은 다시 parallel로 변경 된 상황이다.

 


Conclusion

Schedule in Webflux

정리하면 PubOn과 SubOn은 주도권을 누가 가져갈 것 인가에 대한 차이이다.
 - PubOn은 이하 로직은 내가 주도권을 가지고 Schedule을 지정 하는 것
 - SubOn은 수동적으로 Schedule을 지정하는 것 이다.
가령 일반적으로 http 요청을 받아 response를 주는 과정은 Webflux에서 주도권을 가지고 Schedule을 지정한다.
따라서 HttpHandle 과정 request → executeHandler → (handle) → handleResult → response는 reactor의 schedule(reactor-http-nio-x)에서 처리하게 된다.
해당 스케쥴은 netty(or jetty, undertow, etc…) 에서 관리되는 executor를 사용하고 있다.
따라서 capacity가 유한하며, 서버 기동 시 설정에 의해 결정된다.
// 주의!
reactor-http 쪽 WorkerPool 또한 유한하기 때문에, I/O 작업 등 기타 시간이 걸리는 작업은 최대한 다른 Scheduler에서 실행되는 것이 바람직하다.
ex) DB 작업은 r2dbc에서 관리되는 scheduler로 실행되게 flatmap해서 사용해야, http-netty-connector가 효율적으로 request를 accept 할 수 있다. 그렇기 때문에, R2DBC 또는 reactive하지 않은 방식으로 DB를 사용할 경우 오히려 servlet 방식보다 성능이 떨어질 수 있다.

public Mono<ServerResponse> test(final ServerRequest serverRequest) {
  return serverRequest.bodyToMono(HashMap.class)
      .doOnNext(body -> log.info(Thread.currentThread().getName()))
      .doOnNext(body -> log.info("1"))
      .doOnNext(body -> log.info("2"))
      .subscribeOn(Schedulers.boundedElastic())
      .flatMap(x -> ServerResponse.ok().build());
}

2023-10-26T13:11:29.742+09:00  INFO 10884 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter      : reactor-http-nio-3
2023-10-26T13:11:29.742+09:00  INFO 10884 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter      : 1
2023-10-26T13:11:29.742+09:00  INFO 10884 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter      : 2
netty에서 이미 Scheduler를 지정했기 때문에 subscribeOn이 적용되지 못한 상황 예시

Schedule 활용 예시

1. Webflux Request handling 최적화

public Mono<ServerResponse> test(final ServerRequest serverRequest) {
  return serverRequest.bodyToMono(HashMap.class)
      .doOnNext(x -> wait1000("1"))
      .doOnNext(x -> wait1000("2"))
      .doOnNext(x -> wait1000("3"))
      .flatMap(x -> ServerResponse.ok().build());
}

2023-10-26T13:51:49.133+09:00  INFO 19628 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter      : 1: reactor-http-nio-3
2023-10-26T13:51:50.144+09:00  INFO 19628 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter      : 2: reactor-http-nio-3
2023-10-26T13:51:51.160+09:00  INFO 19628 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter      : 3: reactor-http-nio-3
// Scheduler 적용
public Mono<ServerResponse> test(final ServerRequest serverRequest) {
  return serverRequest.bodyToMono(HashMap.class)
      .publishOn(Schedulers.boundedElastic())
      .doOnNext(x -> wait1000("1"))
      .doOnNext(x -> wait1000("2"))
      .doOnNext(x -> wait1000("3"))
      .flatMap(x -> ServerResponse.ok().build());
}

2023-10-26T13:53:37.177+09:00  INFO 24192 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter      : 1: boundedElastic-1
2023-10-26T13:53:38.187+09:00  INFO 24192 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter      : 2: boundedElastic-1
2023-10-26T13:53:39.200+09:00  INFO 24192 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter      : 3: boundedElastic-1
Webflux에서 시간이 오래 소요 될 것 같은 작업은 publishOn을 통해 reactor Thread를 release

2. 여러 작업을 동시에 execute 할 때

 
public Mono<ServerResponse> test(final ServerRequest serverRequest) {
  return serverRequest.bodyToMono(HashMap.class)
      .doOnNext(x -> Mono.just(x)
          .subscribeOn(Schedulers.boundedElastic())
          .doOnNext(xx -> wait2000("1"))
          .subscribe())
      .doOnNext(x -> Mono.just(x)
          .subscribeOn(Schedulers.boundedElastic())
          .doOnNext(xx -> wait1000("2"))
          .subscribe())
      .doOnNext(x -> Mono.just(x)
          .subscribeOn(Schedulers.boundedElastic())
          .doOnNext(xx -> wait1000("3"))
          .subscribe())
      .doOnNext(body -> log.info("last"))
      .flatMap(x -> ServerResponse.ok().build());
}

2023-10-26T14:10:30.927+09:00  INFO 15004 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter      : last
2023-10-26T14:10:31.930+09:00  INFO 15004 --- [oundedElastic-6] c.h.i.s.n.s.inbound.rest.TestRouter      : 3: boundedElastic-6
2023-10-26T14:10:31.930+09:00  INFO 15004 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter      : 2: boundedElastic-1
2023-10-26T14:10:32.933+09:00  INFO 15004 --- [oundedElastic-4] c.h.i.s.n.s.inbound.rest.TestRouter      : 1: boundedElastic-4
별도의 scheduler를 설정해서 subscribe를 하는 경우
메인 flow 외 별도의 flow로 consume을 하게 된다.
이 상황에 Scheduler를 설정하면 손쉽게 해당 flow를 별도의 schedule로 동작하게 할 수 있다.

3. 전역 또는 Scoped에서 한번에 하나만 실행

@Scheduled("...")
public void batch1_1() {
  Mono.just("1")
      .publishOn(Schedulers.single())
      .doOnNext(TestRouter::wait1000)
      .subscribe();
}
@Scheduled("...")
public void batch1_2() {
  Mono.just("2")
      .publishOn(Schedulers.single())
      .doOnNext(TestRouter::wait1000)
      .subscribe();
}
위 상황처럼 배치가 여러개 돌아갈 때
한번에 하나씩만 수행하도록 하고 싶다면, single() 스케쥴러를 통해 스케쥴링 하면 된다.
이러면 batch1_1과 batch1_2가 동시에 수행되는 것을 방지할 수 있다.

 


요약

 - Reactor진영에서는 Scheduler를 통해 로직이 실행 될 Thread 환경을 지정할 수 있다.
 - 로직의 실행 Thread에 대한 주도권을 가질 땐 PublishOn, 로직을 subscribe할 때 default를 지정 할 목적이면 subscribeOn
 - Webflux에서 I/O등 시간이 걸리는 작업은 scheduler를 지정하여(elastic 추천) 수행

 


REFERENCE

 

 

 

반응형