개발 일지

Java Reactive Streams Publisher / Subscriber 분석 (projectreactor)

북극곰은콜라 2024. 3. 26. 20:24
반응형


개요

ReactibeStreams의 사상 및 목적 조사 및 Publisher / Subscriber의 실제 동작 확인
Java의 Reactive Programing의 구현 사항 및 동작 원리 분석

 


Reactive Programing 이란

정의

데이터 스트림의 구성 및 변화의 전파에 대한 선언적 프로그래밍 패러다임
이 패러다임을 도입하면 배열 또는 이벤트 스트림을 쉽게 표현할 수 있다.

목적

1. (비동기적인) 이벤트 처리를 표현 및 구현하기 위해
2. 데이터 스트림을 표현 및 구현하기 위해
3. 반응형 시스템을 구현하기 위해
실시간 데이터의 변화를 효율적으로 반영하여 제공하기 위해 탄생했다.

원칙

1. Responsive: 시스템은 요청/호출에 대해 즉각 응답할 수 있어야 한다.
2. Resilient: 장애에 대한 탄력성이 요구됩니다. (복제, 봉쇄, 격리, 위임 등 기법 활용)
3. Elastic: 변화에 유연함이 요구 됩니다. 환경 및 작업량에 대응할 수 있어야 합니다.
4. Message Driven: 메시지에 기반한 시스템을 구성해야 합니다.

 


ReactiveStreams

개념

official page: https://www.reactive-streams.org/

 - Reactive programing의 stream을 구성하기 위한 API를 정의한 단체, java 외 javascript, swift 등 여러 언어에 대한 API 및 Rule을 정의했다.
 - 기본적으로 Reactive Programing을 표현하기 위해서 데이터 제공자(Publisher), 데이터 소비자(Subscriber)를 분리해 정의했다.
 - Java의 reactive code API는 java9의 concurrent.Flow를 이용하여 정의되었다.
 - 총 4개의 interface를 정의했으며, 각 구현에 Rule을 정의했으며 각각 publisher, Subscriber, Subscription, Processor이다.

Core IDEA

Data의 발행 주체와 소비 주체를 분리하는 것이 핵심 아이디어이다. Publisher는 Data 발행에 집중하고,. Subscriber는 데이터의 소비에 집중하는 구조를 지향했다.

실시간 Data 전파 구성 방식 (reactiveStreams 이전)

 - PUSH 방식: Publisher가 Data를 발행할 수 있을 때 Subscriber에게 제공(호출)한다.
 - POLL 방식: Subscriber가 소비할 준비가 되었을 때 Data를 요청한다.

문제점

1. PUSH 방식 문제: Subscriber의 상태와 관계없이 Data를 PUSH 하기 때문에, Subscriber의 문제를 대응하기 쉽지 않은 상황이 발생한다. 가령 subscriber의 지연이 발생했을 때 publisher는 이를 기다리거나, subscriber에게 buffer(or queue)를 두어서 해결해야 한다.
2. POLL 방식 문제: Publisher의 Data를 지속적으로 체크하기 위한 요청 오버헤드가 발생한다. 이는 시스템이 복잡하고 커질수록 트래픽이 증가하는 결과를 가져오며 비용이 증가하고, 안정성이 떨어진다.

해결

ReactiveStreams에서는 Publisher는 Data 제공만 관심을 가지고, Subscriber는 소비만 관심을 가진다. 이외 Data 전달에 대해서는 중간 계층인 Subscription을 두고 해결한다.
Publisher는 Data가 준비되면 각 Subscription에게 Data를 제공하고 Subscriber는 소비할 수 있을 만큼만 Subscription에게 제공받는다.
여기서 Backpressure라는 핵심 개념이 들어가는데, Subscriber는 소비할 수 있는 만큼 Subscription에게 request 하며, 데이터를 request 한 만큼 push 받는 개념이다. 이 개념을 통해서 Subscriber는 buffer 없이 Data를 안정적으로 제공받을 수 있다.

1. Publisher

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
id Rule
1 (Publisher->Subscriber onNext).count <= Subscriber.(subscription.request).totalcount
onNext의 호출은 request element보다 작아야 한다.
2 Publisher onNext도중 subscription을 종료하고 onComplete, onError signal로 변경할 수 있다.
3 publisher가 제공하는 모든 signal seial(sync)해야 한다.
4 publisher의 오류 발생 시 onError signal을 주어야 한다.
5 publisher의 정상 종료 시 onComplete signal을 주어야 한다.
6 onError, onComplete signal 발생 시 subscription cancel로 간주한다.
7 terminate 상태일 시 더 이상 signal을 발생 시키면 안된다.
8 subscription cancel되면 subsciber에게 signal을 주면 안된다.
9 publisher의 subscribe 메서드는 아래 사항을 구현해야 한다.
● Subscription을 생성해야 한다.
● subscriber.onSubscribe(Subscription)를 호출해야 한다.
● throw 없이 종료 되어야 한다. (parameter가 null인 경우 제외)
에러 발생 시 onError를 호출 해야한다. (onSubscribe 호출 이후)
10 다른 Subscriber에 대해서 여러번 호출 될 수 있어야 한다.
11 여러 subscriber를 지원하는 것이 권장된다.
Publisher는 Subscription 및 Data의 생성을 담당한다.
Subscription은 subscribe 호출 시 생성하며, 호출 당 하나의 subscription을 만든다.
Data는 자체적으로 생성하거나 상위 스트림에서 받을 수 있어야 한다.

2. Subscriber

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
id Rule
1 onSubscribe(Subscription)에서 Subscription.request(long n)로 onNext signal을 받아야 한다.
2 Publisher responsivity를 저해할 수 있다면 async하게 signaling을 권장
3 onComplete() onError() 메서드에서 subscription또는 publisher를 호출하면 안된다.
4 onComplete onError subscription cancel되었다고 간주한다.
5 subscription이 진행된 후 onSubscribe() 요청의 subscription cancel()한다.
6 subscription이 필요 없어지는 경우 cancel()해야 한다.
7 publisher가 제공하는 모든 signal serial(sync) 여부를 확인해야 한다.
8 pending element가 있을 수 있으니 cancel() 호출 이후 onNext()를 받을 수 있어야 한다.
9 Subscription.request() 호출 전에 onComplete 신호를 받을 수 있어야 한다.
10 Subscription.request() 호출 전에 onError 신호를 받을 수 있어야 한다.
11 signal 처리는 각각 수행되어야 합니다.
12 onSubscribe()는 최대 한번 호출 됩니다.
13 모든 메서드는 thrown 없이 정상 종료 되어야 합니다. 하지만 parameter null인 경우 nullPointException을 발생 시킵니다.
Subscriber는 Data 소비에 대한 기능을 담당한다.
publisher에게 subscribe(Subscriber)로 전달되면, onSubscribe가 한번 무조건 호출되며, 이때 Subscription에게 element를 request(n) 해야 한다.
이후 각 signal이 호출될 수 도 안 될 수 도 있다.

3. Subscription

public interface Subscription {
    public void request(long n);
    public void cancel();
}
id Rule
1 request() cancel() Subscriber만 호출된다.
2 request() subscriber onNext, onSubscribe에서 호출된다.
3 request() 호출의 재귀 현상에 대한 제한을 두어야 한다.
4 request()는 최대한 지연 없이 구현되어야 합니다.
5 cancel()은 최대한 지연 없이 구현되어야 합니다.
6 cancel() 이후 호출되는 request()는 동작하면 안된다.
7 cancel() 이후 호출되는 cancel()은 동작하면 안된다.
8 cancel 되지 않은 상황에서 request()는 반복 호출 될 수 있다.
9 request(long n)에서 parameter <= 0 인 경우 onError IllegalArgumentException과 함께 발생 시킨다.
10 request()는 subscriber의 onNext를 sync하게 호출 한다 (cancel 되지 않았다면)
11 request()는 subscriber의 onComplete, onError를 sync하게 호출 한다 (cancel 되지 않았다면)
12 첫 cancel() 호출 시 publisher에게 signal 중단을 notify 해야 한다.
13 cancel() 호출 시 publisher에게 subscriber에 대한 reference를 끊도록 notify 해야 한다.
14 cancel()이 호출되어 모든 subscription이 취소되면 publisher shutdown을 할 것이다.
15 cancel() 호출은 throw 없이 종료 되어야 한다.
16 request() 호출은 throw 없이 종료 되어야 한다.
17 request(n) n 개수는 Long.MAX_VALUE까지 지원해야 하며, Long.MAX_VALUE는 무한으로 인식
Subscription은 publisher와 Subscriber 사이에서 data flow(signaling)를 표현한다.
Publisher에 의해 subscribe() 될 때마다 하나씩 생성되며, 하나의 Subscriber와 연관된다.

4. Processor

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
id Rule
1 Subscriber Publisher Rule 모두 만족해야 합니다.
2 onError signal을 자체적으로 handling할 수 있지만, subscription cancel 해야 합니다.
Publisher + Subscriber
ProjectReactor에서 해당 인터페이스의 구현체들은 deprecated 되었다. (will be removed in 3.5)
https://github.com/reactor/reactor-core/issues/2431

 


ReactiveStreams Example

위 Rule을 바탕으로 간단한 예시 코드를 작성했다.
예시를 통해 Pub → Sub에 대한 전체적인 흐름을 파악하고, 각 요소들의 역할에 대해 설명하고자 한다.
(projectreactor가 구현한 Pub/Sub의 구현체인 Mono, Flux가 복잡하여 핵심 요약이 힘들다.)
구현 사항:
 - publisher: random 한 int값을 max 개수만큼 생성한다.
 - subscriber: n개 구독하여 element를 받을 때마다 더하는 구독자
 - subscription: n개만큼 request 해서 내려주고, 많으면 cancel 하고 완료되면 complete, error는 error처리
 - processor: 지정된 int 함수를 실행하면서 pub / sub 연결
명확한 flow 파악을 위해 전체적으로 sync 하게 구현했다.
Mono/Flux는 이를 비동기, 논블로킹, backpressure 등 개념을 적용해서 구현되어 있다.

Sample Implementation

SimplePublisher

public class SimplePublisher implements Publisher<Integer> {
  private final List<Integer> dataSources;
  private final Map<String, Subscription> subscriptions = new HashMap<>();

  private SimplePublisher(final List<Integer> dataSources) {
    this.dataSources = dataSources;
  }

  public static SimplePublisher create(final int maxCount) {
    return new SimplePublisher(
        IntStream.generate(() -> (int) (Math.random() * 100))
            .limit(maxCount)
            .boxed()
            .toList());
  }

  @Override
  public synchronized void subscribe(final Subscriber<? super Integer> subscriber) {
    System.out.println("[SimplePublisher] subscribe by: " + subscriber.getClass().getSimpleName());
    String id = UUID.randomUUID().toString();
    SimpleSubscription subscription = new SimpleSubscription(id, subscriber, this.dataSources, this);
    this.subscriptions.put(id, subscription);
    subscriber.onSubscribe(subscription);
  }

  public void cancelSignal(final String subscriptionId) {
    System.out.println("[SimplePublisher] cancelSignal");
    this.subscriptions.get(subscriptionId).cancel();
    this.deleteSubscription(subscriptionId);
  }

  public void deleteSubscription(final String subscriptionId) {
    System.out.println("[SimplePublisher] subscription removed");
    this.subscriptions.remove(subscriptionId);
  }
}
Publisher의 메인 역할
1. 구독자에 따른 Subscription 관리 (create, remove)
2. provide element
예시는 subscribe시마다 subscription을 생성하여 map으로 관리한다.
Publisher의 dataSource는 생성 시 결정 (like Mono.just())

SimpleSubscriber

@RequiredArgsConstructor
public class SimpleSubscriber implements Subscriber<Integer> {
  private final int subscribeCount;
  private int current = 0;

  @Override
  public void onSubscribe(final Subscription subscription) {
    System.out.println("[SimpleSubscriber] onSubscribe, subscription: " + subscription.getClass().getSimpleName());
    subscription.request(subscribeCount);
  }

  @Override
  public void onNext(final Integer t) {
    current += t;
    System.out.println("[SimpleSubscriber] next: " + t + ", current: " + this.current);
  }

  @Override
  public void onError(final Throwable throwable) {
    System.out.println("[SimpleSubscriber] error: " + throwable.getMessage());
  }

  @Override
  public void onComplete() {
    System.out.println("[SimpleSubscriber] complete, final: " + this.current);
  }
}
Subscriber의 역할
1. subscribe시 subscription을 통해 publisher로 element를 요청 (with count)
2. element가 올 때마다 처리
3. error 처리
4. complete 처리
예시는 생성자로 받은 maxCount만큼 subscription에게 요청한다.
element가 올 때 마다 값을 더하여 유지한다.
완료 시 지금까지 더한 값을 표시한다.

SimpleSubscription

@RequiredArgsConstructor
public class SimpleSubscription implements Subscription {
  private final String subscriptionId;
  private final Subscriber<? super Integer> subscriber;
  private final List<Integer> source;
  private final SimplePublisher parent;

  @Override
  public void request(final long n) {
    System.out.println("[SimpleSubscription] request n: " + n);
    try {
      this.source.stream().limit(n).forEach(subscriber::onNext);
      if (source.size() > n) {
        this.parent.cancelSignal(this.subscriptionId);
        return;
      }
      this.subscriber.onComplete();
      this.parent.deleteSubscription(this.subscriptionId);
    } catch (Exception e) {
      this.subscriber.onError(e);
    }
  }

  @Override
  public void cancel() {
    System.out.println("[SimpleSubscription] cancel");
  }
}
Subscription의 역할
1. request 받은 개수만큼 subscriber에게 제공 (onNext)
2. subscriber에게 complete signal 생성
3. subscriber에게 error signal 생성
4. publisher에게 cancel signal 생성
5. cancel처리
예시에서는 request시 publisher에게 제공받은 source를 subscriber에게 순차적으로 제공한다.
subscriber의 request에 따라서 자동으로 cancel을 publisher로 호출한다. (requestCount < elementCount)
완료 및 에러 발생 시 subscriber에게 signal 생성(호출)

SimpleProcessor

@RequiredArgsConstructor
public class SimpleProcessor implements Processor<Integer, Integer> {
  private final Function<Integer, Integer> addOne = i -> i + 1;
  private final Function<Integer, Integer> subOne = i -> i - 1;

  private final Publisher<Integer> delegatePublisher;
  private Subscriber<? super Integer> delegateSubscriber;
  private final String functionName;

  public static SimpleProcessor from(final Publisher<Integer> from, final String functionName) {
    return new SimpleProcessor(from, functionName);
  }

  @Override
  public void subscribe(final Subscriber<? super Integer> subscriber) {
    System.out.println("[SimpleProcessor] subscribe by: " + subscriber.getClass().getSimpleName());
    this.delegateSubscriber = subscriber;
    this.delegatePublisher.subscribe(this);
  }

  @Override
  public void onSubscribe(final Subscription subscription) {
    System.out.println("[SimpleProcessor] onSubscribe, subscription: " + subscription.getClass().getSimpleName());
    this.delegateSubscriber.onSubscribe(subscription);
  }

  @Override
  public void onNext(final Integer t) {
    System.out.println("[SimpleProcessor] next: " + t);
    this.delegateSubscriber.onNext(apply(t));
  }

  @Override
  public void onError(final Throwable throwable) {
    System.out.println("[SimpleProcessor] error: " + throwable.getMessage());
    this.delegateSubscriber.onError(throwable);
  }

  @Override
  public void onComplete() {
    System.out.println("[SimpleProcessor] complete");
    this.delegateSubscriber.onComplete();
  }

  private Integer apply(final int origin) {
    return switch (this.functionName) {
      case "addOne" -> this.addOne.apply(origin);
      case "subOne" -> this.subOne.apply(origin);
      default -> origin;
    };
  }
}
Processor는 publisher와 subscriber의 책임을 모두 가진다.
예시에서는 pub/sub을 delegation 하며, onNext 시 지정된 함수를 적용하여 내려주고 있다.
// Subscription 생성은 생략했다.

Sample Flow 분석

1. Publish → Subscribe Flow

// init Pub / Sub
SimplePublisher simplePublisher = SimplePublisher.create(5);
SimpleSubscriber simpleSubscriber = new SimpleSubscriber(5);

// use Pub / Sub
simplePublisher.subscribe(simpleSubscriber);
// log
[SimplePublisher] subscribe by: SimpleSubscriber
[SimpleSubscriber] onSubscribe, subscription: SimpleSubscription
[SimpleSubscription] request n: 5
[SimpleSubscriber] next: 11, current: 11
[SimpleSubscriber] next: 35, current: 46
[SimpleSubscriber] next: 32, current: 78
[SimpleSubscriber] next: 29, current: 107
[SimpleSubscriber] next: 94, current: 201
[SimpleSubscriber] complete, final: 201
[SimplePublisher] subscription removed
생성 및 구독 단계
 - 시작 전 Element 5개를 제공하는 Publisher와 5개를 구독하여 처리할 Subscruber 인스턴스를 생성했다.
 - publisher는 subscribe(Subscriber) 호출을 통해 전체적인 Flow를 관장할 Subscription을 생성한다.
 - publisher는 Subscription 생성 후 subscriber에게 onSubscribe를 생성하며, 아직 데이터를 전달하진 않았다.
 - subscriber는 onSubscribe() 호출받았으며 Subscription에게 onRequest()로 Element(Data) 발행을 요청한다.
 - Subscription은 Subscriber의 각 signal handler를 호출한다.

Data processing 단계
 - Subscription은 상황에 맞게 Subscriber의 onNext(), onError(), onComplete() 호출한다.
 - Subscriber는 각 signal에 대한 처리를 한다.

2. with Cancel Flow

SimplePublisher simplePublisher = SimplePublisher.create(5);
simplePublisher.subscribe(new SimpleSubscriber(3));
[SimplePublisher] subscribe by: SimpleSubscriber
[SimpleSubscriber] onSubscribe, subscription: SimpleSubscription
[SimpleSubscription] request n: 3
[SimpleSubscriber] next: 44, current: 44
[SimpleSubscriber] next: 70, current: 114
[SimpleSubscriber] next: 60, current: 174
[SimplePublisher] cancelSignal
[SimpleSubscription] cancel
[SimplePublisher] subscription removed
Request보다 Element가 많을 시 cancel 요청을 보내서 subscription을 cancel 처리하는 예시이다.

3. Publish → Processor → Subscriber Flow

예시 프로세스:
1. 사용자는 processor객체를 parent publisher를 가지고 생성한다.
2. processor를 통해 subscribe()하면, processor는 자신을 publisher에게 subscriber로 넘긴다.
3. publisher는 processor를 subscriber로 간주하고 Subscription을 생성한다.
4. onSubscribe는 publisher → processor → subscriber 순으로 진행
5. subscriber는 subscription에게 request 한다.
6. onNext 등 signal은 Subscription → processor → subscriber로 진행
Processor는 Pub + Sub이며, 중간 비즈니스를 담당한다.
예시 Processor는 delegate 할 pub / sub을 받아서 onNext에만 개입하여 function을 execute 하는 역할을 가진다.

Sample Conclusion

전반적인 reactibestreams의 rule에 따른 구현은 위와 같이 구성할 수 있다.
이 구조는 Publisher나 Subscriber가 유연하게 확장될 수 있으며, backpressure 개념이 들어가 있다.
기존 push / poll 구조의 단점인 subscriber buffer 또는 periodic call 문제가 없이, data stream을 구성할 수 있다.
위 구성에 비동기적 동작 및 디테일을 붙이면 실시간 data처리가 가능한 backpressure 기반 stream이 구현된다.

 


ProjectReactor

reactivestreams의 구현한 프로젝트이다.
주요 구현으로는 Publisher, Subscriber, Subscription, processor(will be removed in 3.5)가 있다.
구현체 몇 가지를 살펴보며 설명을 진행하고자 한다.
projectreactor는 추가적으로 비동기처리를 지원하기 위한 scheduler를 정의하고 구현했다.
(scheduler 참고: https://p-bear.tistory.com/68)

MonoJust

Create형 Publisher
final class MonoJust<T> extends Mono<T> implements Fuseable.ScalarCallable<T>, Fuseable, SourceProducer<T>  {
	final T value;

	MonoJust(T value) {
		this.value = Objects.requireNonNull(value, "value");
	}
	...

	@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
		actual.onSubscribe(Operators.scalarSubscription(actual, value));
	}
	...
}
Publisher를 “create” 할 수 있는 대표적인 구현체이다. (Mono.just(…))
생성자에서 단일 값을 받아 유지하며, Subscribe 시 해당 값으로 subscription을 생성하여 onSubscribe를 호출하는 것을 볼 수 있다.

MonoDefer

Deleagete형 Publisher
final class MonoDefer<T> extends Mono<T> implements SourceProducer<T> {
	final Supplier<? extends Mono<? extends T>> supplier;

	MonoDefer(Supplier<? extends Mono<? extends T>> supplier) {
		this.supplier = Objects.requireNonNull(supplier, "supplier");
	}

	@SuppressWarnings("unchecked")
	@Override
	public void subscribe(CoreSubscriber<? super T> actual) {
		Mono<? extends T> p;

		try {
			p = Objects.requireNonNull(supplier.get(), "The Mono returned by the supplier is null");
		}
		catch (Throwable e) {
			Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
			return;
		}

		fromDirect(p).subscribe(actual);
	}
	...
}
상위 Publisher(Mono)를 생성자로 받아서 유지하며, subscribe 시 해당 publisher에게 subscribe를 위임한다. (위임 앞뒤로 추가 로직들을 수행한다.)
just와 달리 onSubscribe에 관여하지 않으며, 상위 스트림에 subscribe()를 연결하는 역할만 한다.

LamdaSubscriber

Execute UserFunction Subscriber

final class LambdaSubscriber<T> implements InnerConsumer<T>, Disposable {
	final Consumer<? super T>            consumer;
	final Consumer<? super Throwable>    errorConsumer;
	final Runnable                       completeConsumer;
	final Consumer<? super Subscription> subscriptionConsumer;
	final Context                        initialContext;
	volatile Subscription subscription;

    LambdaSubscriber(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer,
			@Nullable Consumer<? super Subscription> subscriptionConsumer,
			@Nullable Context initialContext) {
		this.consumer = consumer;
		this.errorConsumer = errorConsumer;
		this.completeConsumer = completeConsumer;
		this.subscriptionConsumer = subscriptionConsumer;
		this.initialContext = initialContext == null ? Context.empty() : initialContext;
	}
    ...

	@Override
	public final void onSubscribe(Subscription s) {
		if (Operators.validate(subscription, s)) {
			this.subscription = s;
			if (subscriptionConsumer != null) {
				try {
					subscriptionConsumer.accept(s);
				}
				catch (Throwable t) {
					Exceptions.throwIfFatal(t);
					s.cancel();
					onError(t);
				}
			}
			else {
				s.request(Long.MAX_VALUE);
			}
		}
	}

	@Override
	public final void onComplete() {
		Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
		if (s == Operators.cancelledSubscription()) {
			return;
		}
		if (completeConsumer != null) {
			try {
				completeConsumer.run();
			}
			catch (Throwable t) {
				Exceptions.throwIfFatal(t);
				onError(t);
			}
		}
	}

	@Override
	public final void onError(Throwable t) {
		Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
		if (s == Operators.cancelledSubscription()) {
			Operators.onErrorDropped(t, this.initialContext);
			return;
		}
		if (errorConsumer != null) {
			errorConsumer.accept(t);
		}
		else {
			Operators.onErrorDropped(Exceptions.errorCallbackNotImplemented(t), this.initialContext);
		}
	}

	@Override
	public final void onNext(T x) {
		try {
			if (consumer != null) {
				consumer.accept(x);
			}
		}
		catch (Throwable t) {
			Exceptions.throwIfFatal(t);
			this.subscription.cancel();
			onError(t);
		}
	}
	...
}
일반적으로 Mono…subscribe(…) 로 method chain을 구성하면 생성되는 subscriber 구현체이다.
생성자로 각 signal 처리를 담당하는 consumer 또는 Runnable 받는다.
각 signal에 대응하는 로직이 있으면 실행 및 실행 결과에 대한 에러처리

ScalarSubscription

publish single element once syncronously
static final class ScalarSubscription<T> implements Fuseable.SynchronousSubscription<T>, InnerProducer<T> {
	final CoreSubscriber<? super T> actual;
	final T value;
	@Nullable
	final String stepName;
	volatile int once;

	ScalarSubscription(CoreSubscriber<? super T> actual, T value, String stepName) {
		this.value = Objects.requireNonNull(value, "value");
		this.actual = Objects.requireNonNull(actual, "actual");
		this.stepName = stepName;
	}

	@Override
	public void cancel() {
		if (once == 0) {
			Operators.onDiscard(value, actual.currentContext());
		}
		ONCE.lazySet(this, 2);
	}
	...

	@Override
	public void request(long n) {
		if (validate(n)) {
			if (ONCE.compareAndSet(this, 0, 1)) {
				Subscriber<? super T> a = actual;
				a.onNext(value);
				if(once != 2) {
					a.onComplete();
				}
			}
		}
	}
	...
}
Mono.just 등으로 생성되는 Subscription 중 하나이다.
하나의 element를 subscriber에게 전달하는 구독으로, 생성 시점에 element하나와 subscriber를 받는다.
request 호출 시 가지고 있는 element를 onNext로 내려주고, complete signal을 보낸다.

 


Conclusion

ReactiveStreams

기존 PUSH / POLL 방식의 한계인 subscriber buffer 또는 perodic call 이슈를 Backpressure를 통해 해결할 수 있도록 정의한 Interface 및 Rule
 - Publisher: Data 발행에 관심을 가지고 있으며, Subscription을 생성 및 관리한다.
 - Subscriber: Data 소비에 관심을 가지고 있으며, BackPressure 개념을 도입했다. 소비할 element 개수를 Subscription에게 request 하고, onNext, onComplete 등 signal handling function을 Subscription이 호출한다.
 - Subscription: Pub / Sub의 Flow를 control 하는 역할을 가지고 있으며, publisher에게 data를 받을 수 있고, subscriber에게 request 개수대로 onNext 호출을 하는 등 작업을 한다.
 - Processor: Publisher + Subscriber

ProjectReactor

ReactiveStreams의 Interface를 구현한 프로젝트
Method Chain으로 Data Pipeline(Stream)을 구성할 수 있도록 구현했다. 실제 동작은 ReactiveStreams의 사상을 따르며, Function들은 비동기(ExecutorService) 위에서 동작한다.
Webflux, RSocket 등 Data Pipeline을 비동기로 구성하는 lib/framework에서 Base로 사용된다.

 


REFERENCE

https://github.com/reactive-streams/reactive-streams-jvm

https://www.reactive-streams.org/

https://github.com/reactor/reactor-core

 

 

 

반응형

'개발 일지' 카테고리의 다른 글

VoltDB란  (0) 2024.06.03
ksqlDB 란  (0) 2024.05.27
Spring Webflux with EventListener  (0) 2024.03.20
[Kafka] Parallel Consumer  (0) 2024.03.19
TURN Protocol (+ STUN Message)  (1) 2024.03.18