개발 일지

Global Hooks And Context Propagation (Projectreactor Advanced Features)

북극곰은콜라 2024. 2. 20. 19:59
반응형


Global Hooks란

리액터는 연산자의 콜백을 전역으로 설정할 수 잇다. 이는 Operator에 element가 흐르는 로직에 관여하는 것과 비슷하다.
Hooks 클래스로 제공되는 기능으로 3가지 종류가 있다.
1. Dropping Hooks
2. Internal Error Hook
3. Assembly Hooks

 


DroppingHooks

일반적으로 onError가 모든 에러를 잡을 수 있을 것으로 기대하지만, 그렇지 못한 상황이 발생한다.
가령 onComplete()를 호출 한 후 onNext()등 다른 signal를 연결했을 때 또는 handle되지 못한 에러가 있다.

Dropped By Previous Complete

// onNextDropped Example
Hooks.onNextDropped(o -> {
  System.out.println("onNextDropped called");
  System.out.println(o);
});

Mono
    .from(subscriber -> {
      subscriber.onComplete();
      subscriber.onNext("");
    })
    .doOnError(throwable -> System.out.println("doOnError Call!"))
    .subscribe();
    
>> onNextDropped called
>> inner

// onErrorDropped Example
Hooks.onErrorDropped(o -> {
  System.out.println("onErrorDropped called");
  System.out.println(o.getMessage());
});

Mono
    .from(subscriber -> {
      subscriber.onComplete();
      subscriber.onError(new RuntimeException("inner"));
    }).doOnError(throwable -> System.out.println("doOnError Call!"))
    .subscribe();
    
>> onErrorDropped called
>> inner
Publisher에서 onComplete() 이후 onNext(), onError()를 호출하면, 잘못된 reactive flow로 에러가 나고doOnError()에 잡힐 꺼 같지만
error signal은 발생하지 않으며, onNext(), onError()는 drop처리된다.
이 부분을 Hooks로 잡아내서 처리할 수 있다.

Dropped Error by not handled

Hooks.onErrorDropped(throwable -> System.out.println("error dropped: " + throwable.getMessage()));

Flux.just(1, 2, 3, 4, 0)
        .doOnNext(i -> System.out.println("element: " + i))
        .map(i -> 10 / i)
        .subscribe();

// hooks에 onErrorDropped를 설정하지 않는다면, default onErrorDropped로 처리된다.
>> [ERROR] (main) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: ...
처리되지 못한 Error Signal들은 결국 drop 처리된다.
이를 Hook으로 drop되기전에 처리할 수 있다.
// onErrorDropped에 등록되는 function은 append 된다.
Hooks.onErrorDropped(o -> System.out.println("onErrorDropped called"));
Hooks.onErrorDropped(o -> System.out.println("onErrorDropped2 called"));
Hooks.onErrorDropped(o -> System.out.println("onErrorDropped3 called"));

>> onErrorDropped called
>> onErrorDropped2 called
>> onErrorDropped3 called

// 1,2번은 초기화로 인하여 제거
Hooks.onErrorDropped(o -> System.out.println("onErrorDropped called"));
Hooks.onErrorDropped(o -> System.out.println("onErrorDropped2 called"));
Hooks.resetOnErrorDropped();
Hooks.onErrorDropped(o -> System.out.println("onErrorDropped3 called"));
    
>> onErrorDropped3 called

동작원리

// Operators.class
public static <T> void onNextDropped(T t, Context context) {
    ...
	Consumer<Object> hook = context.getOrDefault(Hooks.KEY_ON_NEXT_DROPPED, null);
	if (hook == null) {
		hook = Hooks.onNextDroppedHook;
	}
	if (hook != null) {
		hook.accept(t);
	}
	else if (log.isDebugEnabled()) {
		log.debug("onNextDropped: " + t);
	}
}

// MonoFlatMap
@Override
public void onNext(T t) {
	if (done) {
		Operators.onNextDropped(t, actual.currentContext());
		return;
	}
	...
}

// FluxMap.class
@Override
public void onNext(T t) {
	if (done) {
		Operators.onNextDropped(t, actual.currentContext());
		return;
	}
	...
}
Operator를 사용하면, 각 operator에 맞는 구현체들이 호출되는데 이는 Publisher → Mono or Flux를 상속 받은 클래스들이다.
이들은 onNext() 시 done이라는 boolean flag를 확인하며, done 되었을 때 Operators의 onNextDropped()를 호출한다.
Operators의 onNextDropped() 메서드는 onNextDroppedHook이 있으면, function을 accept한다.

 


Internal Error Hook

onOperatorError()는 예기치 못한 에러를 hook할 수 있는 방법이다.
operator에서 error가 발생하면 콜 되며, exception에 따른 처리 및 변경이 가능하다.
Hooks.onOperatorError((throwable, o) -> {
  if (((Integer) 0).equals(o)) {
    System.out.println("[onOperatorError] called");
    return new RuntimeException("[onOperatorError] changed");
  }
  return throwable;
});

Flux.just(1, 2, 3, 4, 0)
    .doOnNext(i -> System.out.println("element: " + i))
    .map(i -> 10 / i)
    .onErrorComplete()
    .subscribe();
    
>> element: 1
>> element: 2
>> element: 3
>> element: 4
>> element: 0
>> [onOperatorError] called
onOperatorError는 operator에 error signal이 전달 될 때 마다 hooking된다.
이는 모든 operator에 doOnError + onErrorMap을 거는 것과 유사하다.

동작원리

// Operators.class
public static Throwable onOperatorError(@Nullable Subscription subscription, Throwable error, @Nullable Object dataSignal, Context context) {
	...
	BiFunction<? super Throwable, Object, ? extends Throwable> hook = context.getOrDefault(Hooks.KEY_ON_OPERATOR_ERROR, null);
	if (hook == null) {
		hook = Hooks.onOperatorErrorHook;
	}
	if (hook == null) {
		...
		return t;
	}
	return hook.apply(error, dataSignal);
}

// FluxFlatMap.class
@Override
public void onNext(T t) {
	...
	try {
		...
	}
	catch (Throwable ex) {
		actual.onError(Operators.onOperatorError(s, ex, t, actual.currentContext()));
		return;
	}

	if (m instanceof Callable) {
		...
		try {
			...
		}
		catch (Throwable ex) {
			actual.onError(Operators.onOperatorError(s, ex, t, actual.currentContext()));
			return;
		}
		...
	}

	try {
		...
	}
	catch (Throwable e) {
		actual.onError(Operators.onOperatorError(this, e, t, actual.currentContext()));
	}
}
Operators에는 onOperatorError() 메서드를 통해 hooks에 세팅된 hook function을 불러와서 apply해준다
이는 Publisher의 구현체들에서 호출되는데, 대표적으로 onNext() 함수들의 catch문 안에서 호출 된다.

 


Assembly Hooks

Assembly Hooks는 Operator chain이 조립(초기화) 때 마다 호출된다.
대표적으로 onEachOperator가 있으며, chain이 추가될 때 마다 hooking된다.
일반적으로 reactor.core.publisher.Operators 클래스와 함께 사용 및 구현된다.
Hooks.onEachOperator(publisher -> {
  System.out.println("onEachOperator called");
  return publisher;
});

Flux.just(1, 2, 3, 4)
    .doOnNext(i -> System.out.println("doOnNext 1: " + i))
    .doOnNext(i -> System.out.println("doOnNext 2: " + i))
    .subscribe();
    
>> onEachOperator called
>> onEachOperator called
>> onEachOperator called
>> doOnNext 1: 1
>> doOnNext 2: 1
>> doOnNext 1: 2
>> doOnNext 2: 2
>> doOnNext 1: 3
>> doOnNext 2: 3
>> doOnNext 1: 4
>> doOnNext 2: 4
기본적인 onEachOperator의 실행 시점을 찍었다.
doOnNext(), doOnNext(), just(), 가 각각 호출(역순, chain, 조립) 될 때 onEachOperator가 실행되었다.

동작원리

// Flux.class
protected static <T> Flux<T> onAssembly(Flux<T> source) {
  Function<Publisher, Publisher> hook = Hooks.onEachOperatorHook;
  if (hook != null) {
    source = (Flux)hook.apply(source);
  }

  ...

  return source;
}
Flux.just(…) → onAssembly(…)
Flux.doOnNext(…) → doOnSignal(…) → onAssembly(…)
위 처럼 operator가 호출될 때 마다 onAssembly()가 호출 된다.
onAssembly()는 Hooks의 onEachOperatorHook를 참조하여, hook function을 apply 하는 형태로 동작한다.

 


LiftFunction

// Hooks.class
public static void onEachOperator(Function<? super Publisher<Object>, ? extends Publisher<Object>> onEachOperator) {
  onEachOperator(onEachOperator.toString(), onEachOperator);
}

// Operators$LiftFunction.class
final static class LiftFunction<I, O> implements Function<Publisher<I>, Publisher<O>> {
  ...
}

// Operators.class
public static <I, O> Function<? super Publisher<I>, ? extends Publisher<O>> lift(BiFunction<Scannable, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super I>> lifter) {
	return LiftFunction.liftScannable(null, lifter);
}
public static <O> Function<? super Publisher<O>, ? extends Publisher<O>> lift(
			Predicate<Scannable> filter,
			BiFunction<Scannable, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super O>> lifter) {
	return LiftFunction.liftScannable(filter, lifter);
}
public static <I, O> Function<? super Publisher<I>, ? extends Publisher<O>> liftPublisher(BiFunction<Publisher, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super I>> lifter) {
	return LiftFunction.liftPublisher(null, lifter);
}
public static <O> Function<? super Publisher<O>, ? extends Publisher<O>> liftPublisher(
			Predicate<Publisher> filter,
			BiFunction<Publisher, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super O>> lifter) {
	return LiftFunction.liftPublisher(filter, lifter);
}
Hooks의 onEachOperator는 publisher → publisher인 function을 매 operator가 chaining될 때 호출되도록 한다.
Publisher → Publisher인 Function의 구현체 class를 reactor에서 제공하며 이는 LiftFunction.class이다.
이를 생성할 수 있는 방법으로 Operators의 lift() 및 liftPublisher() 메서드들을 제공하고있다.
Operators의 lift(), liftPublisher()는 각각 LiftFunction의 static method인 liftScannable(), liftPublisher()을 호출하여 LiftFunction 인스턴스를 생성하고 있다.

lifter: (pub, sub) → sub 함수, publisher를 decorate할 수 있는 실질적인 로직이 들어간다.
name: 일반 name
filter: function의 수행여부를 판단하는 predicate

LiftPublisher()

// LiftFunction.class
static final <I, O> LiftFunction<I, O> liftPublisher(
				@Nullable Predicate<Publisher> filter,
				BiFunction<Publisher, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super I>> lifter) {
	Objects.requireNonNull(lifter, "lifter");
	return new LiftFunction<>(filter, lifter, lifter.toString());
}
liftPublisher를 통해 생성된 LiftFunction은 CoreSubscriber를 decorate할 수 있도록 도와준다.
내부적으로 (pub, sub) → sub 인 function을 통해 subscriber에 기능을 추가한다.

LiftScannable()

static final <I, O> LiftFunction<I, O> liftScannable(
				@Nullable Predicate<Scannable> filter,
				BiFunction<Scannable, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super I>> lifter) {
	Objects.requireNonNull(lifter, "lifter");

	Predicate<Publisher> effectiveFilter =  null;
	if (filter != null) {
		effectiveFilter = pub -> filter.test(Scannable.from(pub));
	}

	BiFunction<Publisher, ? super CoreSubscriber<? super O>, ? extends CoreSubscriber<? super I>>
			effectiveLifter = (pub, sub) -> lifter.apply(Scannable.from(pub), restoreContextOnSubscriberIfAutoCPEnabled(pub, sub));

	return new LiftFunction<>(effectiveFilter, effectiveLifter, lifter.toString());
}
위 liftPublisher와 비슷하지만, 특수한 lifter를 가진 LiftFunction을 만드는 static 메서드이다.
(scannable, sub) → sub인 Function으로 lifter를 만든다.
Scannable이 인자인 만큼 보다 디테일한 custom이 가능하다.
Scannable이란?
Projectreactor에서 제공하는 component의 state를 탐색할 수 있는 interface이다.
대부분의 publisher 및 subscriber가 Scannable interface를 구현했으며, 이를 통해 attribute 및 publisher chain을 탐색할 수 있다.
https://projectreactor.io/docs/core/release/api/reactor/core/Scannable.Attr.html
Hooks.onEachOperator(Operators.lift(
    (scannable, coreSubscriber) -> {
      if (scannable.tagsDeduplicated().containsKey("customTag")) {
        ...
      }
      return coreSubscriber;
    }
));

Hooks.onEachOperator(Operators.liftPublisher((publisher, coreSubscriber) -> {
  if (publisher instanceof Flux<?>) {
    ...
  }
  return coreSubscriber;
}));
Operators를 활용하는 기본 방법
public class MdcContextLifterConfiguration {
  public static final String MDC_CONTEXT_REACTOR_KEY = MdcContextLifterConfiguration.class.getName();

  @PostConstruct
  @SuppressWarnings("unchecked")
  public void contextOperatorHook() {
    Hooks.onEachOperator(MDC_CONTEXT_REACTOR_KEY, Operators.lift((scannable, subscriber) -> new MdcContextLifter(subscriber)));
  }

  @PreDestroy
  public void cleanupHook() {
    Hooks.resetOnEachOperator(MDC_CONTEXT_REACTOR_KEY);
  }

  /**
   * Helper that copies the state of Reactor [Context] to MDC on the #onNext function.
   */
  @RequiredArgsConstructor
  public static class MdcContextLifter<T> implements CoreSubscriber<T> {

    private final CoreSubscriber<T> coreSubscriber;

    @Override
    public void onSubscribe(Subscription subscription) {
      coreSubscriber.onSubscribe(subscription);
    }

    @Override
    public void onNext(T t) {
      copyToMdc(coreSubscriber.currentContext());
      coreSubscriber.onNext(t);
    }

    ...

    @Override
    public Context currentContext() {
      return coreSubscriber.currentContext();
    }

    /**
     * Extension function for the Reactor [Context]. Copies the current context to the MDC, if context is empty clears the MDC.
     * State of the MDC after calling this method should be same as Reactor [Context] state.
     * One thread-local access only.
     */
    void copyToMdc(Context context) {
      if (context != null && !context.isEmpty()) {
        Map<String, String> map = context.stream()
            .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()));
        MDC.setContextMap(map);
      } else {
        MDC.clear();
      }
    }
  }
}

 

Reactive 환경에서 MDC context를 활용하기 위한 lifter example 이다.
@Configuration에서 Hooks에 onEachOperator로 MdcContextLifter라는 subscriber 교체를 진행한다.
MdcContextLifter는 기존 subscriber를 decorator/delegate한 구현체이다.
핵심 로직은 onNext()시 context(reactor)를 context(MDC)로 복사하는 것 이다.
// context(reactor)에 traceId를 넣는 것은 rest filter에서 subscribeContext()를 통해서 한다 (후술)

 


Context Propagation 

일반적으로 명령형 프로그래밍 시 사용하는 ThreadLocal은 리액티브 프로그래밍에서는 문제를 야기한다.
리액티브 동작방식이 잦은 Thread의 전환이 있고, 개발자가 이를 인지하기 쉽지 않기 때문이다.
대표적으로 ThreadLocal을 활용한 MDC가 있다. 사실상 Logback에서의 MDC 활용은 문제를 야기한다.
이를 해결하기 위해 리액티브에서 3.1.0버전부터 제공하는 Context라는 기능이 있다.

ContextView, Context

Context는 immutable한 map과 유사하다.
of() : Map.of()와 유사하게, immutable한 context를 생성하는 메서드
put(), putAll() : Context가 immutable하기 때문에 값을 create or update 하여 새로운 context instance를 반환
delete() : delete() 후 새로운 instance를 반환

실제 구현체는 context안의 element 수 만큼 맴버 key, value를 갖고 있으며, contextN은 LinkedHashMap을 상속받았다.

deferContextual(Function<ContextView,? extends Publisher<T>> arg)

기본적으로 sequence는 defer()와 동일하며, contextView를 참조할 수 있는 Operator이다.

contextWrite(ContextView contextToAppend)

Context를 CUD할 수 있는 Operator이다.

Use of Context

Mono.just("object")
    .delayUntil(s -> Mono.deferContextual(contextView -> printWithThread(contextView.get("traceId")))) // print
    .contextWrite(Context.of("traceId", "2-" + UUID.randomUUID()))
    .flatMap(this::doRest) // print
    .delayUntil(s -> Mono.deferContextual(contextView -> printWithThread(contextView.get("traceId")))) // print
    .contextWrite(Context.of("traceId", "1-" + UUID.randomUUID()))
    .subscribe();
    
>> [main] 2-e646d101-1c0b-48dc-bc38-05ea1cfddff1
>> [boundedElastic-1] 1-5f7b0ed2-5823-464d-ad72-ca497787e8d5 request rest >> object
>> [boundedElastic-1] 1-8c27fbd4-0100-4f80-a765-338d08b20ea0
Context는 기본적으로 Subscription Chain 전역에서 사용되는 Map으로 이해할 수 있다.
context 생성의 주도권은 subscriber에게 있으며, 따라서 context의 전파는 chain의 끝에서부터 올라간다.
위 예제를 보면 Context는
 - Scheduler와 상관없이 참조가 가능하다.
 - context는 subscribe → publish로 전파된다.
를 알 수 있다.
// Kafka Consume with Micrometer Tracing Example
kafkaReceiver
    .receive()
    .flatMap(record -> {
      Observation receiverObservation =
          KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
              KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
              () ->
                  new KafkaRecordReceiverContext(
                      record, "user.receiver", receiverOptions.bootstrapServers()),
              observationRegistry);

      return Mono.just(record)
          .doOnNext(r -> log.info("topic: {}, key: {}, value: {}", r.topic(), r.key(), r.value()))
          .doOnTerminate(receiverObservation::stop)
          .doOnError(receiverObservation::error)
          .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));})
    .subscribe();
위 예제는 Micrometer에서 reactor-kafka로 receive할 때 tracing을 하는 예시이다.
전체 flow는
1. record와 topic을 기준으로 Observation 객체 생성
2. record 처리
3. terminate 또는 error 시에 observation의 stop 및 error 처리 호출
4. contextWrite로 observation 객체 context에 put
이다.
contextWrite는 subscribe 시점에 위로 적용해야 upstream으로 전달되기 때문에 contextWrite에서 receiverObservation 인스턴스를 넣었다.
Context의 기능 상, subscribe를 포함한 전체 로직을 개발자가 해야하기 때문에, Micrometer에서 contextWrite 기능을 library에서 제공할 수 없다.
따라서, 위 예제를 참고해 observation을 개발자가 직접 consume 로직 구현시 마다 넣어주어야 한다.

 


Conclusion

Hooks 및 Context는 reactor의 부가 기능이며, 직접적인 활용보다는, Library의 기능들을 이해하는데 필요하다.
Hooks는 Reactor Chaining 전반에 적용되는 기능으로, 신중하게 사용해야 한다.
Context는 Subscribe하는 시점에 상위 chain에서 참조할 값으로 사용할 수 있지만, 메인 sequance에 관여하기에는 제한적인 기능이다.

 


REFERENCE

https://projectreactor.io/docs/core/release/reference/#hooks

https://projectreactor.io/docs/core/release/reference/#context

https://projectreactor.io/docs/core/release/api/reactor/core/Scannable.Attr.html

 

 

반응형

'개발 일지' 카테고리의 다른 글

TURN Protocol (+ STUN Message)  (1) 2024.03.18
ProjectReactor Case Study  (0) 2024.02.22
Hot vs Cold Publisher (Projectreactor Advanced Features)  (0) 2024.02.19
Clean Code in Reactive  (1) 2024.01.25
GraphQL Overview  (1) 2024.01.15