반응형
개요
Reactor에서 제공하는 Pub / Sub 모델은 실행될 Thread를 전략적으로 선택할 수 있게 interface를 제공한다.
- publishOn(Scheduler scheduler): 어디에서 Publish 될 것인가
- subscribeOn(Scheduler scheduler): 어디에서 subscribe 할 것인가
Scheduler
interface Scheduler extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
long now(TimeUnit unit);
Worker createWorker();
interface Worker extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
}
}
Scheduler는 사용자에게 threading을 효율적으로 제어할 수 있도록 설계된 interface이다.
Worker는 Scheduler에 의해 create되며, 역할은 task를 execute 하는 것이다.
일반적으로 구현체는 각각 용도에 맞는 ExecutorService 또는 그 reference를 가지고 있으며
해당 executorService를 활용하여 Thread를 구현로직에 따라 Execute한다. (일반적으로 스케쥴링을 위한 ScheduledExecutorService를 가지고 있다.)
Default Schedulers
Project Reactor에서는 기 구현된 클래스들을 제공한다.
이는 Schedulers 클래스의 static 메서드로 제공 받을 수 있다.
주요 Scheduler
ImmediateScheduler:
- Thread 변경 없는 scheduler → ExecutorService를 쓰지 않는다.
- interface상 scheduler가 notNull일 시 사용
singleScheduler:
- Thread 하나 짜리 executorService에서 돌아가는 scheduler, 한번에 하나만 실행될 작업에 적합
ParallelScheduler:
- Parall한 작업을 수행하기 위한 scheduler, 다수의 작업을 짧게 실행하기에 적합하다.
- I/O등 작업을 수행하기 부적합. (parall Thread는 기본적으로 CPU 갯수)
- 기본적인 Reactor 연산의 기본값 (reactor-http-nio 등)
BoundedElasticScheduler:
- ExecutorService에서 Worker를 생성하고, 재사용
- I/O 작업 등 시간이 걸리는 작업을 수행하기 적합
- worker는 1분 이상 idle 되면 제거
ExecutorScheduler:
- custom executor를 세팅할 수 있는 scheduler
- 개발자는 반드시 executorService로 lifeCycle 관리를 해야 함
Operator: PublishOn(), SubscribeOn()
Reactor에서 Scheduler를 적용할 수 있게 제공하는 method 이다.
- publishOn(): Source로부터 이하 로직이 실행 될 Scheduler를 지정할 수 있는 메서드 subscribeOn(): Source부터 - Scheduler가 지정되지 않은 upStream의 시작점부터 Scheduler를 적용할 수 있는 메서드
PublishOn()
public static void main(String[] args) {
Mono.just("pubEx")
.doOnNext(body -> log.info("1"))
.publishOn(Schedulers.newSingle("single1"))
.doOnNext(body -> log.info("2"))
.publishOn(Schedulers.parallel())
.doOnNext(body -> log.info("3"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(body -> log.info("4"))
.subscribe();
}
09:47:37.881 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 1
09:47:37.886 [single1-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 2
09:47:37.887 [parallel-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 3
09:47:37.888 [boundedElastic-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 4
최초 실행 쓰레드는 main으로 1번 로그는 main Thread에서 실행된다.
2, 3, 4는 각각 doOnNext 전 Scheduler를 지정하여 실행된 것을 확인할 수 있다.
SubscribeOn()
public static void main(String[] args) {
Mono.just("subEx")
.doOnNext(body -> log.info("1"))
.doOnNext(body -> log.info("2"))
.subscribeOn(Schedulers.newSingle("single1"))
.doOnNext(body -> log.info("3"))
.subscribeOn(Schedulers.parallel())
.doOnNext(body -> log.info("4"))
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
10:18:58.911 [single1-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 1
10:18:58.914 [single1-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 2
10:18:58.914 [single1-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 3
10:18:58.914 [single1-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 4
SubscribeOn은 PublishOn과 다르게, Scheduler가 적용되지 않은 upStream에 Scheduler를 등록한다.
따라서 최초의 newSingle() 스케쥴러는 적용되었지만, 이하 parallel, boundedElastic은 적용되지 못 했다.
자세한 사항은 후술
public Mono<ServerResponse> test(final ServerRequest serverRequest) {
return serverRequest.bodyToMono(HashMap.class)
.flatMap(x -> Mono.just(x)
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(body -> log.info(Thread.currentThread().getName()))
.doOnNext(body -> log.info("1"))
.doOnNext(body -> log.info("2")))
.flatMap(x -> ServerResponse.ok().build());
}
2023-10-26T14:04:40.575+09:00 INFO 10540 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter : boundedElastic-1
2023-10-26T14:04:40.575+09:00 INFO 10540 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter : 1
2023-10-26T14:04:40.575+09:00 INFO 10540 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter : 2
scheduler 적용 예시
Scheduler 적용/미적용 비교
public static void main(String[] args) {
Flux.fromIterable(List.of("1","2","3","4"))
.doOnNext(TestRouter::wait1000)
.subscribe();
Flux.fromIterable(List.of("A","B","C","D"))
.doOnNext(TestRouter::wait1000)
.subscribe();
}
10:00:06.700 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 1
10:00:07.712 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 2
10:00:08.720 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 3
10:00:09.726 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 4
10:00:10.737 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- A
10:00:11.741 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- B
10:00:12.752 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- C
10:00:13.765 [main] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- D
1초씩 딜레이를 줘서 로깅하는 로직이 실행 된다.
1번 Subscribe가 종료되어야, 2번 subscribe가 진행되는 것을 확인할 수 있다.
이는 내부 로직이 main Thread에서 진행되기 때문에, main이 blocking으로 동작하여, 1번 subscribe line이 끝나야 2번 subscribe line이 실행되기 때문
public static void main(String[] args) {
Flux.fromIterable(List.of("1","2","3","4"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(TestRouter::wait1000)
.subscribe();
Flux.fromIterable(List.of("A","B","C","D"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(TestRouter::wait1000)
.subscribe();
}
10:02:43.690 [boundedElastic-2] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- A
10:02:43.690 [boundedElastic-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 1
10:02:44.704 [boundedElastic-2] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- B
10:02:44.704 [boundedElastic-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 2
10:02:45.714 [boundedElastic-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 3
10:02:45.714 [boundedElastic-2] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- C
10:02:46.727 [boundedElastic-2] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- D
10:02:46.727 [boundedElastic-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 4
Scheduler 적용 시 이하 프로세스는 Scheduler에서 관리되며 main Thread의 경우 다음 line으로 넘어간다.
Scheduler는 각각의 Thread에서 로직을 executorService를 통해서 실행하며, 서로 간섭하지 않는다.
PublishOn() VS SubscribeOn()
public static void main(String[] args) {
Mono.just("pub vs sub")
.doOnNext(body -> log.info("1"))
.publishOn(Schedulers.newSingle("pub"))
.doOnNext(body -> log.info("2"))
.publishOn(Schedulers.parallel())
.doOnNext(body -> log.info("3"))
.subscribeOn(Schedulers.newSingle("sub"))
.doOnNext(body -> log.info("4"))
.subscribe();
}
10:28:30.634 [sub-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 1
10:28:30.638 [pub-2] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 2
10:28:30.638 [parallel-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 3
10:28:30.638 [parallel-1] INFO com.ht.iot.smr.node.sample.inbound.rest.TestRouter -- 4
1번 로직은 본래 main 쓰레드에서 execute 했었지만 .subscribeOn(Schedulers.newSingle("sub")) 으로 인하여 sub(single)에서 execute되었다.
2번은 .publishOn(Schedulers.newSingle("pub")) 으로 pub(single)에서 실행 되었으며
3,4번은 다시 parallel로 변경 된 상황이다.
Conclusion
Schedule in Webflux
정리하면 PubOn과 SubOn은 주도권을 누가 가져갈 것 인가에 대한 차이이다.
- PubOn은 이하 로직은 내가 주도권을 가지고 Schedule을 지정 하는 것
- SubOn은 수동적으로 Schedule을 지정하는 것 이다.
가령 일반적으로 http 요청을 받아 response를 주는 과정은 Webflux에서 주도권을 가지고 Schedule을 지정한다.
따라서 HttpHandle 과정 request → executeHandler → (handle) → handleResult → response는 reactor의 schedule(reactor-http-nio-x)에서 처리하게 된다.
해당 스케쥴은 netty(or jetty, undertow, etc…) 에서 관리되는 executor를 사용하고 있다.
따라서 capacity가 유한하며, 서버 기동 시 설정에 의해 결정된다.
// 주의!
reactor-http 쪽 WorkerPool 또한 유한하기 때문에, I/O 작업 등 기타 시간이 걸리는 작업은 최대한 다른 Scheduler에서 실행되는 것이 바람직하다.
ex) DB 작업은 r2dbc에서 관리되는 scheduler로 실행되게 flatmap해서 사용해야, http-netty-connector가 효율적으로 request를 accept 할 수 있다. 그렇기 때문에, R2DBC 또는 reactive하지 않은 방식으로 DB를 사용할 경우 오히려 servlet 방식보다 성능이 떨어질 수 있다.
public Mono<ServerResponse> test(final ServerRequest serverRequest) {
return serverRequest.bodyToMono(HashMap.class)
.doOnNext(body -> log.info(Thread.currentThread().getName()))
.doOnNext(body -> log.info("1"))
.doOnNext(body -> log.info("2"))
.subscribeOn(Schedulers.boundedElastic())
.flatMap(x -> ServerResponse.ok().build());
}
2023-10-26T13:11:29.742+09:00 INFO 10884 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter : reactor-http-nio-3
2023-10-26T13:11:29.742+09:00 INFO 10884 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter : 1
2023-10-26T13:11:29.742+09:00 INFO 10884 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter : 2
netty에서 이미 Scheduler를 지정했기 때문에 subscribeOn이 적용되지 못한 상황 예시
Schedule 활용 예시
1. Webflux Request handling 최적화
public Mono<ServerResponse> test(final ServerRequest serverRequest) {
return serverRequest.bodyToMono(HashMap.class)
.doOnNext(x -> wait1000("1"))
.doOnNext(x -> wait1000("2"))
.doOnNext(x -> wait1000("3"))
.flatMap(x -> ServerResponse.ok().build());
}
2023-10-26T13:51:49.133+09:00 INFO 19628 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter : 1: reactor-http-nio-3
2023-10-26T13:51:50.144+09:00 INFO 19628 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter : 2: reactor-http-nio-3
2023-10-26T13:51:51.160+09:00 INFO 19628 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter : 3: reactor-http-nio-3
// Scheduler 적용
public Mono<ServerResponse> test(final ServerRequest serverRequest) {
return serverRequest.bodyToMono(HashMap.class)
.publishOn(Schedulers.boundedElastic())
.doOnNext(x -> wait1000("1"))
.doOnNext(x -> wait1000("2"))
.doOnNext(x -> wait1000("3"))
.flatMap(x -> ServerResponse.ok().build());
}
2023-10-26T13:53:37.177+09:00 INFO 24192 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter : 1: boundedElastic-1
2023-10-26T13:53:38.187+09:00 INFO 24192 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter : 2: boundedElastic-1
2023-10-26T13:53:39.200+09:00 INFO 24192 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter : 3: boundedElastic-1
Webflux에서 시간이 오래 소요 될 것 같은 작업은 publishOn을 통해 reactor Thread를 release
2. 여러 작업을 동시에 execute 할 때
public Mono<ServerResponse> test(final ServerRequest serverRequest) {
return serverRequest.bodyToMono(HashMap.class)
.doOnNext(x -> Mono.just(x)
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(xx -> wait2000("1"))
.subscribe())
.doOnNext(x -> Mono.just(x)
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(xx -> wait1000("2"))
.subscribe())
.doOnNext(x -> Mono.just(x)
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(xx -> wait1000("3"))
.subscribe())
.doOnNext(body -> log.info("last"))
.flatMap(x -> ServerResponse.ok().build());
}
2023-10-26T14:10:30.927+09:00 INFO 15004 --- [ctor-http-nio-3] c.h.i.s.n.s.inbound.rest.TestRouter : last
2023-10-26T14:10:31.930+09:00 INFO 15004 --- [oundedElastic-6] c.h.i.s.n.s.inbound.rest.TestRouter : 3: boundedElastic-6
2023-10-26T14:10:31.930+09:00 INFO 15004 --- [oundedElastic-1] c.h.i.s.n.s.inbound.rest.TestRouter : 2: boundedElastic-1
2023-10-26T14:10:32.933+09:00 INFO 15004 --- [oundedElastic-4] c.h.i.s.n.s.inbound.rest.TestRouter : 1: boundedElastic-4
별도의 scheduler를 설정해서 subscribe를 하는 경우
메인 flow 외 별도의 flow로 consume을 하게 된다.
이 상황에 Scheduler를 설정하면 손쉽게 해당 flow를 별도의 schedule로 동작하게 할 수 있다.
3. 전역 또는 Scoped에서 한번에 하나만 실행
@Scheduled("...")
public void batch1_1() {
Mono.just("1")
.publishOn(Schedulers.single())
.doOnNext(TestRouter::wait1000)
.subscribe();
}
@Scheduled("...")
public void batch1_2() {
Mono.just("2")
.publishOn(Schedulers.single())
.doOnNext(TestRouter::wait1000)
.subscribe();
}
위 상황처럼 배치가 여러개 돌아갈 때
한번에 하나씩만 수행하도록 하고 싶다면, single() 스케쥴러를 통해 스케쥴링 하면 된다.
이러면 batch1_1과 batch1_2가 동시에 수행되는 것을 방지할 수 있다.
요약
- Reactor진영에서는 Scheduler를 통해 로직이 실행 될 Thread 환경을 지정할 수 있다.
- 로직의 실행 Thread에 대한 주도권을 가질 땐 PublishOn, 로직을 subscribe할 때 default를 지정 할 목적이면 subscribeOn
- Webflux에서 I/O등 시간이 걸리는 작업은 scheduler를 지정하여(elastic 추천) 수행
REFERENCE
반응형
'개발 일지' 카테고리의 다른 글
Micrometer Log Tracing (feat. Spring Boot 3) (0) | 2023.12.14 |
---|---|
Windows 10 부팅 지연 이슈 (시스템 예약 위치..) (0) | 2023.11.14 |
Spring RSocket Code 분석 (0) | 2023.10.29 |
Spring Cloud Gateway Code 분석 (2) | 2023.10.28 |
Spring Webflux, HttpHandler code 분석 (0) | 2023.10.27 |