반응형
개요
RSocket을 Spring에 Integration 하는 코드 분석
RSocket은 Spring-Message 라이브러리를 통해 Spring Context에 Integration 되었다.
ini Server
public abstract class AbstractMethodMessageHandler<T>
implements ReactiveMessageHandler, ApplicationContextAware, InitializingBean, BeanNameAware {
...
@Override
public void afterPropertiesSet() {
List<? extends HandlerMethodArgumentResolver> resolvers = initArgumentResolvers();
...
this.invocableHelper.addArgumentResolvers(resolvers);
List<? extends HandlerMethodReturnValueHandler> handlers = initReturnValueHandlers();
...
this.invocableHelper.addReturnValueHandlers(handlers);
initHandlerMethods();
}
...
}
public class RSocketMessageHandler extends MessageMappingMessageHandler {
...
@Override
public void afterPropertiesSet() {
// Add argument resolver before parent initializes argument resolution
getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver());
super.afterPropertiesSet();
getHandlerMethods().forEach((composite, handler) -> {
if (composite.getMessageConditions().contains(RSocketFrameTypeMessageCondition.CONNECT_CONDITION)) {
MethodParameter returnType = handler.getReturnType();
if (getCardinality(returnType) > 0) {
throw new IllegalStateException(
"Invalid @ConnectMapping method. " +
"Return type must be void or a void async type: " + handler);
}
}
});
}
...
}
spring message에서 구현된 reactive 환경의 invoke handle method 관련 클래스
InitializingBean을 구현하여, spring init 시점에 handlerMethod들을 load 한다.
invocableHelper는 추후 payload를 handling할 때 사용
Handle Payload
...
@Override
public Mono<Payload> requestResponse(Payload payload) {
return handleAndReply(payload, FrameType.REQUEST_RESPONSE, Flux.just(payload)).next();
}
...
private Flux<Payload> handleAndReply(Payload firstPayload, FrameType frameType, Flux<Payload> payloads) {
...
MessageHeaders headers = createHeaders(firstPayload, frameType, responseRef);
...
Flux<DataBuffer> buffers = payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true));
Message<Flux<DataBuffer>> message = MessageBuilder.createMessage(buffers, headers);
return Mono.defer(() -> this.messageHandler.handleMessage(message))
...
}
RSocket 메시지를 파싱하고 fire&forget, request&response, request&stream 등 정보를 바탕으로 호출되는 메서드이다.
1. payload에서 header 파싱
2. payload에서 DataBuffer로 read
3. DataBuffer + header 로 Message 생성
4. messageHandler로 messsage handle
AbstractMethodMessageHandler (spring-messaging)
@Override
public Mono<Void> handleMessage(Message<?> message) throws MessagingException {
...
match = getHandlerMethod(message);
...
return handleMatch(match.mapping, match.handlereMthod, message);
}
protected Mono<Void> handleMatch(T mapping, HandlerMethod handlerMethod, Message<?> message) {
handlerMethod = handlerMethod.createWithResolvedBean();
return this.invocableHelper.handleMessage(handlerMethod, message);
}
public Mono<Void> handleMessage(HandlerMethod handlerMethod, Message<?> message) {
InvocableHandlerMethod invocable = initMessageMappingMethod(handlerMethod);
..
return invocable.invoke(message)
.switchIfEmpty(Mono.defer(() -> handleReturnValue(null, invocable, message)))
.flatMap(returnValue -> handleReturnValue(returnValue, invocable, message))
...
}
더보기
private Match<T> getHandlerMethod(Message<?> message) {
List<Match<T>> matches = new ArrayList<>();
RouteMatcher.Route destination = getDestination(message);
List<T> mappingsByUrl = (destination != null ? this.destinationLookup.get(destination.value()) : null);
if (mappingsByUrl != null) {
addMatchesToCollection(mappingsByUrl, message, matches);
}
if (matches.isEmpty()) {
// No direct hits, go through all mappings
Set<T> allMappings = this.handlerMethods.keySet();
addMatchesToCollection(allMappings, message, matches);
}
if (matches.isEmpty()) {
handleNoMatch(destination, message);
return null;
}
Comparator<Match<T>> comparator = new MatchComparator(getMappingComparator(message));
matches.sort(comparator);
if (logger.isTraceEnabled()) {
logger.trace("Found " + matches.size() + " handler methods: " + matches);
}
Match<T> bestMatch = matches.get(0);
if (matches.size() > 1) {
Match<T> secondBestMatch = matches.get(1);
if (comparator.compare(bestMatch, secondBestMatch) == 0) {
HandlerMethod m1 = bestMatch.handlerMethod;
HandlerMethod m2 = secondBestMatch.handlerMethod;
throw new IllegalStateException("Ambiguous handler methods mapped for destination '" +
(destination != null ? destination.value() : "") + "': {" +
m1.getShortLogMessage() + ", " + m2.getShortLogMessage() + "}");
}
}
return bestMatch;
}
handlerMethod
spring-messaging에서 구현한 class이다. method invoke하기 위한 모든 정보를 맴버로 갖는다.
Match
mapping 정보 + handlerMethod를 가진 spring-messageing의 private class
1. Message를 handle할 수 있는 method를 찾는다 (mapping 정보 등..)
2. invoke가 가능한 method 프락시 객체 생성
3. reflection으로 method 실행
반응형
'개발 일지' 카테고리의 다른 글
Windows 10 부팅 지연 이슈 (시스템 예약 위치..) (0) | 2023.11.14 |
---|---|
Spring Webflux Threading (publishOn, subscribeOn) 분석 (1) | 2023.10.30 |
Spring Cloud Gateway Code 분석 (2) | 2023.10.28 |
Spring Webflux, HttpHandler code 분석 (0) | 2023.10.27 |
Spring Cloud Sleuth란 - 분산 Tracing (0) | 2023.09.11 |