개발 일지

Spring RSocket Code 분석

북극곰은콜라 2023. 10. 29. 13:43
반응형


개요

RSocket을 Spring에 Integration 하는 코드 분석

 


RSocket은 Spring-Message 라이브러리를 통해 Spring Context에 Integration 되었다.

ini Server

public abstract class AbstractMethodMessageHandler<T>
		implements ReactiveMessageHandler, ApplicationContextAware, InitializingBean, BeanNameAware {
		...
    @Override
	public void afterPropertiesSet() {
		List<? extends HandlerMethodArgumentResolver> resolvers = initArgumentResolvers();
		...
		this.invocableHelper.addArgumentResolvers(resolvers);
		List<? extends HandlerMethodReturnValueHandler> handlers = initReturnValueHandlers();
		...
		this.invocableHelper.addReturnValueHandlers(handlers);
		initHandlerMethods();
	}
	...
}
public class RSocketMessageHandler extends MessageMappingMessageHandler {
    ...
    @Override
	public void afterPropertiesSet() {
		// Add argument resolver before parent initializes argument resolution
		getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver());

		super.afterPropertiesSet();

		getHandlerMethods().forEach((composite, handler) -> {
			if (composite.getMessageConditions().contains(RSocketFrameTypeMessageCondition.CONNECT_CONDITION)) {
				MethodParameter returnType = handler.getReturnType();
				if (getCardinality(returnType) > 0) {
					throw new IllegalStateException(
							"Invalid @ConnectMapping method. " +
									"Return type must be void or a void async type: " + handler);
				}
			}
		});
	}
	...
}
spring message에서 구현된 reactive 환경의 invoke handle method 관련 클래스
InitializingBean을 구현하여, spring init 시점에 handlerMethod들을 load 한다.
invocableHelper는 추후 payload를 handling할 때 사용

 


Handle Payload

...
@Override
public Mono<Payload> requestResponse(Payload payload) {
	return handleAndReply(payload, FrameType.REQUEST_RESPONSE, Flux.just(payload)).next();
}
...


private Flux<Payload> handleAndReply(Payload firstPayload, FrameType frameType, Flux<Payload> payloads) {
	...
	MessageHeaders headers = createHeaders(firstPayload, frameType, responseRef);
	...
	Flux<DataBuffer> buffers = payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true));
	Message<Flux<DataBuffer>> message = MessageBuilder.createMessage(buffers, headers);
	return Mono.defer(() -> this.messageHandler.handleMessage(message))
			...
}
RSocket 메시지를 파싱하고 fire&forget, request&response, request&stream 등 정보를 바탕으로 호출되는 메서드이다.
1. payload에서 header 파싱
2. payload에서 DataBuffer로 read
3. DataBuffer + header 로 Message 생성
4. messageHandler로 messsage handle

AbstractMethodMessageHandler (spring-messaging)

@Override
public Mono<Void> handleMessage(Message<?> message) throws MessagingException {
	...
	match = getHandlerMethod(message);
	...
	return handleMatch(match.mapping, match.handlereMthod, message);
}

protected Mono<Void> handleMatch(T mapping, HandlerMethod handlerMethod, Message<?> message) {
	handlerMethod = handlerMethod.createWithResolvedBean();
	return this.invocableHelper.handleMessage(handlerMethod, message);
}

public Mono<Void> handleMessage(HandlerMethod handlerMethod, Message<?> message) {
	InvocableHandlerMethod invocable = initMessageMappingMethod(handlerMethod);
	..
	return invocable.invoke(message)
			.switchIfEmpty(Mono.defer(() -> handleReturnValue(null, invocable, message)))
			.flatMap(returnValue -> handleReturnValue(returnValue, invocable, message))
			...
}
더보기
private Match<T> getHandlerMethod(Message<?> message) {
	List<Match<T>> matches = new ArrayList<>();

	RouteMatcher.Route destination = getDestination(message);
	List<T> mappingsByUrl = (destination != null ? this.destinationLookup.get(destination.value()) : null);
	if (mappingsByUrl != null) {
		addMatchesToCollection(mappingsByUrl, message, matches);
	}
	if (matches.isEmpty()) {
		// No direct hits, go through all mappings
		Set<T> allMappings = this.handlerMethods.keySet();
		addMatchesToCollection(allMappings, message, matches);
	}
	if (matches.isEmpty()) {
		handleNoMatch(destination, message);
		return null;
	}
	Comparator<Match<T>> comparator = new MatchComparator(getMappingComparator(message));
	matches.sort(comparator);
	if (logger.isTraceEnabled()) {
		logger.trace("Found " + matches.size() + " handler methods: " + matches);
	}
	Match<T> bestMatch = matches.get(0);
	if (matches.size() > 1) {
		Match<T> secondBestMatch = matches.get(1);
		if (comparator.compare(bestMatch, secondBestMatch) == 0) {
			HandlerMethod m1 = bestMatch.handlerMethod;
			HandlerMethod m2 = secondBestMatch.handlerMethod;
			throw new IllegalStateException("Ambiguous handler methods mapped for destination '" +
					(destination != null ? destination.value() : "") + "': {" +
					m1.getShortLogMessage() + ", " + m2.getShortLogMessage() + "}");
		}
	}
	return bestMatch;
}

 

handlerMethod

spring-messaging에서 구현한 class이다. method invoke하기 위한 모든 정보를 맴버로 갖는다.

Match

mapping 정보 + handlerMethod를 가진 spring-messageing의 private class
1. Message를 handle할 수 있는 method를 찾는다 (mapping 정보 등..)
2. invoke가 가능한 method 프락시 객체 생성
3. reflection으로 method 실행

 

반응형