개발 일지

Hot vs Cold Publisher (Projectreactor Advanced Features)

북극곰은콜라 2024. 2. 19. 19:46
반응형


개요

Project Reactor 기능 중 심화 개념에 해당되는 파트들을 정리

 

 


Hot Publisher And Cold Publisher

일반적으로 Webflux 등에서 사용되는 Publisher들은 subscribe()가 되어야 Element를 생성한다.
이는 모두 Cold Publisher를 사용하기 때문이며, Hot Publisher는 구독과 상관없이 Element를 생성한다.
cold와 hot의 차이점은 데이터 발행의 주도권이 누구에게 있는가 이다.

Cold Publisher

 

Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                          .map(String::toUpperCase);

source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));
Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE
Cold Publisher는 Subscribe()에 따라서 Data 원천으로부터 데이터를 뽑아서 element를 만든다.
따라서 Subscriber들은 구독 시점의 데이터에 따라서 element 들이 결정된다.
일반적으로 DB, Resource, Other System등이 데이터의 원천이 되며, 요청이 있을 때 만 데이터를 쿼리해서 제공하는 것이 합리적이다.

Hot Publisher

// 공식 예제
DirectProcessor<String> hotSource = DirectProcessor.create();

Flux<String> hotFlux = hotSource.map(String::toUpperCase);

// 1번 구독자 인입
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.onNext("blue");
hotSource.onNext("green");

// 2번 구독자 인입
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete();
DirectProcessor는 deprecated된 개념으로 Sinks로 대체되었다.
Sinks.Many<String> hotSource = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase);

// 1번 구독자 인입
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.tryEmitNext("blue");
hotSource.tryEmitNext("green");

// 2번 구독자 인입
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

hotSource.tryEmitNext("orange");
hotSource.tryEmitNext("purple");
hotSource.tryEmitComplete();
Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE
Hot Publisher는 subscriber의 호출과 별도로 element를 발행하는 publisher로 볼 수 있다.
따라서 구독자가 없어도 element는 발행할 수 있으며, 발행 주도권은 publisher에게 있다.

Create Hot Publisher

just()

final class MonoJust<T> extends Mono<T> implements Fuseable.ScalarCallable<T>, Fuseable, SourceProducer<T> {
  final T value;

  MonoJust(T value) {
    this.value = Objects.requireNonNull(value, "value");
  }
  ...
}
just()로 생성하는 MonoJust, FluxArray 등 Publisher는 대표적인 Hot Publisher이다.
생성자로 value 속성을 사전에 세팅하고 이를 하위 Sequence로 전달하기 때문이다.
public void doSomeThings() {
  Mono.just(this.createString());
}
private String createString() {
  String s = "string";
  System.out.println("out: " + s);
  return s;
}

-----------------------------
out: string
실제로 아무도 subscribe하지 않았지만, createString() 메서드는 호출이 되었다.
// 명시적으로 success sink시에 실행
Mono.create(monoSink -> monoSink.success(this.createString()));

// lazy하게 실행하기 위해 defer()를 사용
Mono.defer(() -> Mono.just(this.createString()));
Cold하게 생성하면 의도되지 않은 메서드 호출을 방지할 수 있다.
하지만, cold하게 동작하기 때문에, createString은 element 마다 호출된다.

publish()

ConnectableFlux는 Flux의 publish()로 생성되는 Hot Publisher이다.
cold → hot으로 변경하는 대표적인 방법이다.
자세한 사항은 후술

 


cache()

public final Flux<T> cache(int history) {
  return this.replay(history).autoConnect();
}

public final ConnectableFlux<T> replay(int history) {
  return history == 0 ? onAssembly((ConnectableFlux)(new FluxPublish(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE), false))) : onAssembly((ConnectableFlux)(new FluxReplay(this, history, 0L, (Scheduler)null)));
}
cache() 또한 hot publisher로 만들 수 있는 방법이다.
내부적으로는 ConnectableFlux를 생성하고, element 제공 방식 중 buffer를 통한 캐싱방식으로 동작한다.
자세한 사항은 후술

 


ConnectableFlux

// Publisher 생성
Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

// ConnectableFlux로 변경
ConnectableFlux<Integer> co = source.publish();

// 구독자 1
co.subscribe(System.out::println, e -> {}, () -> {});
// 구독자 2
co.subscribe(System.out::println, e -> {}, () -> {});

System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");

// upstream 구독자들을 트리거
co.connect();
done subscribing
will now connect
subscribed to source
1
1
2
2
3
3
기본적인 흐름은 ConnectableFlux에 subscribe()한 구독자는 즉시 element를 전달받는 것이 아니라
ConnectableFlux가 준비되고 connect()를 호출해야 element를 전달 받는다.
이는 Publisher가 여러 subscriber들이 구독하기를 기다렸다가, 한번에 publish(broadcasting)할 수 있는 방법이다.

ConnectableFlux와 BackPressure

Projectreactor는 BackPressure가 핵심 개념 중 하나이다.
얼핏 ConnectableFlux는 BackPressure에 정면으로 위배하는 것처럼 보인다.
ConnectableFlux는 이를 해결하기 위한 두 가지 방법을 제시해준다.
1. publish()를 통한 생성: 동적으로 여러 구독자들의 demand를 모니터링 하며, 하나라도 구독자가 pending된다면, 즉시 소스로의 요청을 중단한다.
2. replay()를 통한 생성: 첫 번째 구독 데이터를 버퍼링하며, 이후 구독자에게 demand에 따라서 이 데이터를 제공한다.
위 와 같은 방식으로 ConnectableFlux는 BackPressure를 유지하는 정책을 가지고 있다.

cache with ConnectableFlux

final class FluxReplay<T> extends ConnectableFlux<T> implements Scannable, Fuseable, OptimizableOperator<T, T> {
  final CorePublisher<T> source;
  ...

  public final CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) throws Throwable {
    while(true) {
      // replay용 subscriber 생성
      ReplaySubscriber<T> c = this.connection;
      ...

      // 전달받은 subscriber와 ReplaySubscriber로 ReplayInner 생성
      ReplayInner<T> inner = new ReplayInner(actual, c);
      ...

      // 앞에 생성한 ReplaySubscriber의 buffer를 통해 ReplayInner를 Replay
      c.buffer.replay(inner);
      ...
    }
  }
  ...

  static final class ReplayInner<T> implements ReplaySubscription<T> {
    final CoreSubscriber<? super T> actual;
    final ReplaySubscriber<T> parent;
    int index;
    int tailIndex;
    Object node;
    int fusionMode;
    long totalRequested;
    ...
  }

  // ReplayBuffer의 구현체를 가지고 있음
  static final class ReplaySubscriber<T> implements InnerConsumer<T>, Disposable {
    final ReplayBuffer<T> buffer;
    ...
    
    public void onNext(T t) {
      ReplayBuffer<T> b = this.buffer;
      if (b.isDone()) {
        Operators.onNextDropped(t, this.currentContext());
      } else {
        ++this.produced;
        // onNext가 처리될 때 마다 buffer에 value를 추가
        b.add(t);
        ...
      }
    }
  }
  
  static final class UnboundedReplayBuffer<T> implements ReplayBuffer<T> {
    ...
    // subscribe mode에 따른 
    public void replay(ReplaySubscription<T> rs) {
      if (rs.enter()) {
        if (rs.fusionMode() == 0) {
          this.replayNormal(rs);
        } else {
          this.replayFused(rs);
        }

      }
    }
    
    ...
    
    void replayNormal(ReplaySubscription<T> rs) {
      int missed = 1;
      Subscriber<? super T> a = rs.actual();
      ...

      do {
        ...
        // node(element값)들을 ReplayInner에서 가져옴
        Object[] node = (Object[])((Object[])rs.node());
        ...
        while(e != r) {
          ...

          // 현재 index가 buffer된 사이즈라면, 다음 값을 가져오기 위해 break
          empty = index == this.size;
          if (empty) {
            break;
          }
          
          
          // tailIndex는 현재 처리할 노드
          T v = node[tailIndex];
          a.onNext(v);
          // index를 한칸 뒤로
          ++tailIndex;
          ++index;
          ...
        }

        ...

        rs.index(index);
        rs.tailIndex(tailIndex);
        rs.node(node);
        ...
      } while(missed != 0);
    }
  }

  static final class SizeBoundReplayBuffer<T> implements ReplayBuffer<T> {
    // 사이즈가 정해진 replay 시나리오, 중첩구조를 가진 Node(AtomicReferenece + index)를 사용
    // 나머지는 UnboundedReplayBuffer와 대동소이
    ...
  }

  static final class SizeAndTimeBoundReplayBuffer<T> implements ReplayBuffer<T> {
    // 사이즈와 time이 정해진 시나리오, 중첩구조를 가진 TimeNode(AtomicReferenece + index + time)를 사용
    // 나머지는 UnboundedReplayBuffer와 대동소이
    ...
  }

  ...
}
cache()의 동작방식은
1. 최초 subscribe시 buffer에 처리된 값을 넣고
2. 이후 subscribe는 buffer에서 값을 빼서 전달해주는 방식
cache() 또한 ConnectableFlux을 확장한 FluxReplay를 사용하기 때문에
Hot Publisher로 분류된다.

 


Conclusion

일반적인 Backpressure 개념과 어울리는 cold와 다르게, hot Source은 publisher 쪽에서 주도적으로 element를 발행한다.
Hot Publisher는 Hot Source를 취급할 수 있는 Publisher이며, 대부분 reactive를 지원하는 프레임워크 및 라이브러리에서 사용 중이다.
 
대부분의 비즈니스는 cold 만으로 충분하다.
일부 비즈니스는 cold → hot으로 전환하는 것이 유리하지만, hot publisher는 backpressure buffer, replay 정책등을 잘 고려해서 사용해야 한다.

 


REFERENCE

https://projectreactor.io/docs/core/release/reference/#advanced

https://m.blog.naver.com/gngh0101/221560774000

 

 

 

반응형

'개발 일지' 카테고리의 다른 글

ProjectReactor Case Study  (0) 2024.02.22
Global Hooks And Context Propagation (Projectreactor Advanced Features)  (1) 2024.02.20
Clean Code in Reactive  (1) 2024.01.25
GraphQL Overview  (1) 2024.01.15
Netflix OSS란 (feat. Service Mesh)  (0) 2024.01.05