반응형
개요
Webclient는 Spring Webflux에 포함된 외부 요청 / 응답에 대한 Handling을 담당하는 모듈이다.
기존 RestTemplate의 요청 <-> 응답 사이의 동기성을 해결하고, 보다 효과적으로 서버 리소스를 사용하기 위해 등장했다.
내부적으로 비동기적으로 구성되어 있으며, 구현은 Spring Webflux를 활용했다.
WebClient 사용 분석
WebClient는 총 4단계로 요청 -> 응답 처리를 추상화하였다.
Http Config → Reqeust Config → Exchange → Response Handling
각각
- http 기본 설정 (socket 설정 포함)
- 요청 정의
- 발송
- 응답 처리
1. Build with Config
WebClient.builder()
....
.build()
DefaultWebClientBuilder
public WebClient build() {
ClientHttpConnector connectorToUse = this.connector != null ? this.connector : this.initConnector();
ExchangeFunction exchange = this.exchangeFunction == null ? ExchangeFunctions.create(connectorToUse, this.initExchangeStrategies()) : this.exchangeFunction;
ExchangeFunction filteredExchange = this.filters != null ? (ExchangeFunction)this.filters.stream().reduce(ExchangeFilterFunction::andThen).map((filter) -> {
return filter.apply(exchange);
}).orElse(exchange) : exchange;
HttpHeaders defaultHeaders = this.copyDefaultHeaders();
MultiValueMap<String, String> defaultCookies = this.copyDefaultCookies();
return new DefaultWebClient(filteredExchange, this.initUriBuilderFactory(), defaultHeaders, defaultCookies, this.defaultRequest, new DefaultWebClientBuilder(this));
}
사실상 Req ↔︎ Res 전 구간의 설정 및 설정 구현체의 reference를 가지고 있다.
구현체 내부적으로 Config를 가지고 있으며,
Config를 통해 동일한 설정을 가진 webclient builder를 만들 수 있다. (mutate)
기본적으로 동작할 설정을 가진 구현체를 생성하여, 재활용한다.
(WebClient는 builder를 멤버 가지고 있으며 mutate 시 해당 builder를 리턴해서 재사용성을 높였다)
2. Request 준비
Request를 생성하기 위한 각종 정보를 세팅하는 작업
Uri, Header, Body 등 Request에 필요한 정보를 interface 형태로 spec을 정의하고, 최종 구현체가 모든 정보를 가지고 전달할 수 있게 한다.
3. Exchange to get ResponseSpec
public WebClient.ResponseSpec retrieve() {
return new DefaultResponseSpec(this.exchange(), this::createRequest);
}
public Mono<ClientResponse> exchange() {
ClientRequest request = this.inserter != null ? this.initRequestBuilder().body(this.inserter).build() : this.initRequestBuilder().build();
return Mono.defer(() -> {
Mono<ClientResponse> responseMono = DefaultWebClient.this.exchangeFunction.exchange(request).checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]").switchIfEmpty(DefaultWebClient.NO_HTTP_CLIENT_RESPONSE_ERROR);
if (this.contextModifier != null) {
responseMono = responseMono.contextWrite(this.contextModifier);
}
return responseMono;
});
}
private HttpRequest createRequest() {
return new HttpRequest() {
private final URI uri = DefaultRequestBodyUriSpec.this.initUri();
private final HttpHeaders headers = DefaultRequestBodyUriSpec.this.initHeaders();
public HttpMethod getMethod() {
return DefaultRequestBodyUriSpec.this.httpMethod;
}
public String getMethodValue() {
return DefaultRequestBodyUriSpec.this.httpMethod.name();
}
public URI getURI() {
return this.uri;
}
public HttpHeaders getHeaders() {
return this.headers;
}
};
}
ClientResponse을 받는 메서드
1. Request Spec으로부터 정보를 받아 ClientRequest를 생성
2. ExchangeFuction을 통해 exchange 수행 (ExchangeFuction은 후술)
3. ResponseSpec 생성 (Exchange 결과(Mono) + Request 객체)
4. Response Handling
ClientResponse를 처리할 수 있는 메서드를 지원한다.
Configuration
Builder Method | Desc | Example |
baseUrl(String baseUrl) | 기본 Url 세팅 | .baseUrl("http://localhost") |
defaultUriVariables(Map<String, ?> defaultUriVariables) | 기본 Uri 변수 세팅 최종 URL의 {}값을 치환 |
.defaultUriVariables(Map.of("path1", "값1", "query1", "값2")) |
uriBuilderFactory(UriBuilderFactory uriBuilderFactory) | 내부적으로 사용되는 BuilderFactory를 customize | .uriBuilderFactory(new DefaultUriBuilderFactory()) |
defaultHeader(String header, String... values) | 기본 Header 세팅 | .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) |
defaultHeaders(Consumer<HttpHeaders> headersConsumer) | Consumer를 accept(this.initHeaders()) | Consumer<HttpHeaders> headersConsumer = httpHeaders -> httpHeaders.setContentType(MediaType.APPLICATION_JSON); WebClient.builder() .defaultHeaders(headersConsumer) |
defaultCookie(String cookie, String... values) | 기본 Cookie 세팅 | .defaultCookie("cookie_key", "cookie_value") |
defaultCookies(Consumer<MultiValueMap<String, String>> cookiesConsumer) | cookiesConsumer.accept(this.initCookies()); | Consumer<MultiValueMap<String,String>> cookiesConsumer = map -> map.add("defaultCookie", "value"); WebClient.builder() .defaultCookies(cookiesConsumer) |
defaultRequest(Consumer<RequestHeadersSpec<?>> defaultRequest) | defaultRequest가 있으면 andThen으로 추가 없으면 defaultRequest로 세팅 |
Consumer<WebClient.RequestHeadersSpec<?>> defaultRequest = requestHeadersSpec -> requestHeadersSpec.attribute("test", "q"); WebClient.builder() .defaultRequest(defaultRequest) |
filter(ExchangeFilterFunction filter) | webClient용 filter 추가 request / response handling |
.filter(ExchangeFilterFunction.ofRequestProcessor(this::loggingRequestInfo)) // 로깅용 필터를 구현하여 add |
filters(Consumer<List<ExchangeFilterFunction>> filtersConsumer) | Filter Consumer를 accept(this.initFilters()) | |
clientConnector(ClientHttpConnector connector) | ClientHttpConnector 세팅 자세한 사항은 후술 |
.clientConnector(new ReactorClientHttpConnector(HttpClient.create())) |
codecs(Consumer<ClientCodecConfigurer> configurer) | configure Encoding / Decoding clientCodecConfigurer를 통해 register 자세한 사항은 후술 |
.codecs( clientCodecConfigurer -> { clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder()); clientCodecConfigurer.customCodecs().register(new Jackson2JsonEncoder()); } |
exchangeStrategies(ExchangeStrategies strategies) | exchange의 stratege를 세팅 자세한 사항은 후술 |
|
@Deprecated exchangeStrategies(Consumer<ExchangeStrategies.Builder> configurer) |
deprecate | |
exchangeFunction(ExchangeFunction exchangeFunction) | connector와 strategies를 가진 ExchangeFuction을 세팅 자세한 사항은 후술 |
|
apply(Consumer<Builder> builderConsumer) | Consumer를 accept(this) | Consumer<WebClient.Builder> baseUrlToB = builder -> builder.baseUrl("b.com"); WebClient.builder() .baseUrl("a.com") .apply(baseUrlToB); |
WebClient Components 분석
ClientConnector
public interface ClientHttpConnector {
Mono<ClientHttpResponse> connect(HttpMethod method, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback);
}
request 정보 + Callback Function을 받아서 Mono로 Response를 전달하는 method를 가진 interface
HTTP 통신의 최종 목적이 되는 메서드이다.
구현체들 마다 차이가 있지만, 대부분 비슷한 로직을 가졌으며, 내부적으로 HttpClient 구현체가 다르다.
ex) ReactorClientHttpConnector → Netty의 HttpClient,
JettyClientHttpConnector → jetty.client의 HttpClient를 가진다
ClientCodecConfigurer
Encoder 및 Decoder를 세팅하기 위한 Config Interface
핵심은 추상클래스의 BaseCodecConfigurer의 멤버인 BaseDefaultCodecs, DefaultCustomCodecs를 Config 하는 기능 제공
세팅된 Reader와 Writer를 제공해 주는 핵심 메서드
모양을 보면 defaultCodec에 customCodec을 apply 해서 제공한다.
BaseDefaultCodecs
기본적으로 지원하는 코덱에 대한 세팅이 되어있는 클래스
특히 jackson2JsonDecoder, Encoder를 많이 사용하게 된다.
추가적으로 코덱 단에서 설정할 수 있는 부분이 있는데
maxInMemorySize(int size)
- Encode, Decode시의 Message의 최대크기를 정할 수 있다.
- default: 256KB
enableLoggingRequestDetails(boolean var1)
- Requset / Response에 대한 상세로깅을 활성화할 수 있음
- 기본적으로 Authorization Header 등 민감 정보가 올 수 있는 부분 때문에, default = false
DefaultCustomCodecs
BaseCodecConfigurer가 가진 내부 final 클래스
customCodec을 apply 하기 위해 일종의 custom encoder, decoder의 홀더 역할을 한다.
핵심 메서드인 private void addCodec(Object codec, boolean applyDefaultConfig)을 통해 readers와 writers를 세팅한다.
ExchangeFunction
ExchangeFunction interface
Request를 받아서 Response를 받아오는 작업의 interface
filter apply 메서드와 함께 구현해야 함
ExchangeFunctions
ExchangeFunction을 생성할 수 있는 Util 클래스
내부적으로 ExchangeFunction을 구현한 DefaultExchangeFunction을 가지고 있다.
DefaultExchangeFunction
Connector와 ExchangeStrategies를 가지고 있다.
connector를 통해 connect를 수행하고, exchangeStrategies를 통해 message read / write를 수행한다.
default로 logging이 false인데, strategies가 들어오면 전략에 동봉된 코덱을 확인
public Mono<ClientResponse> exchange(ClientRequest clientRequest) {
Assert.notNull(clientRequest, "ClientRequest must not be null");
HttpMethod httpMethod = clientRequest.method();
URI url = clientRequest.url();
return this.connector.connect(httpMethod, url, (httpRequest) -> {
return clientRequest.writeTo(httpRequest, this.strategies);
}).doOnRequest((n) -> {
this.logRequest(clientRequest);
}).doOnCancel(() -> {
ExchangeFunctions.logger.debug(clientRequest.logPrefix() + "Cancel signal (to close connection)");
}).onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, (t) -> {
return this.wrapException(t, clientRequest);
}).map((httpResponse) -> {
String logPrefix = this.getLogPrefix(clientRequest, httpResponse);
this.logResponse(httpResponse, logPrefix);
return new DefaultClientResponse(httpResponse, this.strategies, logPrefix, httpMethod.name() + " " + url, () -> {
return this.createRequest(clientRequest);
});
});
}
1. connector를 통해 connect 수행
- connect의 callback 메서드로 request와 strategies를 수행하도록 세팅
2. logging 및 Exception 처리 등록
3. map 체이닝으로 ClientResponse를 생성
Message Write
private static <M extends ReactiveHttpOutputMessage> Mono<Void> writeWithMessageWriters(
M outputMessage, BodyInserter.Context context, Object body, ResolvableType bodyType, @Nullable ReactiveAdapter adapter) {
Publisher<?> publisher;
if (body instanceof Publisher) {
publisher = (Publisher<?>) body;
}
else if (adapter != null) {
publisher = adapter.toPublisher(body);
}
else {
publisher = Mono.just(body);
}
MediaType mediaType = outputMessage.getHeaders().getContentType();
return context.messageWriters().stream()
.filter(messageWriter -> messageWriter.canWrite(bodyType, mediaType))
.findFirst()
.map(BodyInserters::cast)
.map(writer -> write(publisher, bodyType, mediaType, outputMessage, context, writer))
.orElseGet(() -> Mono.error(unsupportedError(bodyType, context, mediaType)));
}
BodyInserters에서 Body를 write 할 때 MessageWriter 중 첫 번째 것을 사용한다.
fromValue → writeWithMessageWriters(…)
Message Read
private static <T, S extends Publisher<T>> S readWithMessageReaders(
ReactiveHttpInputMessage message, BodyExtractor.Context context, ResolvableType elementType,
Function<HttpMessageReader<T>, S> readerFunction,
Function<UnsupportedMediaTypeException, S> errorFunction,
Supplier<S> emptySupplier) {
if (VOID_TYPE.equals(elementType)) {
return emptySupplier.get();
}
MediaType contentType = Optional.ofNullable(message.getHeaders().getContentType())
.orElse(MediaType.APPLICATION_OCTET_STREAM);
return context.messageReaders().stream()
.filter(reader -> reader.canRead(elementType, contentType))
.findFirst()
.map(BodyExtractors::<T>cast)
.map(readerFunction)
.orElseGet(() -> {
List<MediaType> mediaTypes = context.messageReaders().stream()
.flatMap(reader -> reader.getReadableMediaTypes(elementType).stream())
.collect(Collectors.toList());
return errorFunction.apply(
new UnsupportedMediaTypeException(contentType, mediaTypes, elementType));
});
}
BodyExtractor에서 등록된 messageReader 중 첫 번째 reader를 통해 메시지를 Read 한다.
toMono / toFlux → readWithMessageReaders(…)
WebClient 간단 세팅 Template
WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(HttpClient.create()
// SSL 인증서 무시 설정
.secure(SslProvider.builder()
.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE))
.defaultConfiguration(SslProvider.DefaultConfigurationType.NONE)
// Handshake Timeout tjfwjd
.handshakeTimeoutMillis(2000)
.build())
// DNS Resolver 설정
.resolver(DefaultAddressResolverGroup.INSTANCE) // Sets the JVM built-in resolver.
// Connection Timeout 설정
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.applicationProperties.getTcpClientConnectionTimeout())
.doOnConnected(connection ->
connection
// ReadTimeout 설정
.addHandlerLast(new ReadTimeoutHandler(this.applicationProperties.getTcpClientReadTimeout()))
// WriteTimeout 설정
.addHandlerLast(new WriteTimeoutHandler(this.applicationProperties.getTcpClientWriteTimeout())))))
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> {
// Date Size 무제한 설정
configurer.defaultCodecs().maxInMemorySize(-1);
// 살세 로깅 enable
configurer.defaultCodecs().enableLoggingRequestDetails(true);
})
.build())
// MDC로 transactionId 넣는 설정
.filter(mdcFilter)
// Req, Res 로깅 필터 등록 설정
.filter(ExchangeFilterFunction.ofRequestProcessor(WebClientConfig::loggingRequestInfo))
.filter(ExchangeFilterFunction.ofResponseProcessor(WebClientConfig::loggingResponseInfo))
.build();
private static Mono<ClientRequest> loggingRequestInfo(final ClientRequest request) {
StringBuffer sb = new StringBuffer();
sb.append("Req 3rd > ").append(request.method())
.append(", URI=").append(request.url())
.append(", Header=").append(request.headers().isEmpty() ? "" : JsonUtil.buildJsonStr(request.headers()))
.append(", Body=").append(JsonUtil.buildJsonStr(request.body()));
log.info(sb.toString());
return Mono.just(request);
}
private static Mono<ClientResponse> loggingResponseInfo(final ClientResponse response) {
log.info("Res 3rd > {}", response.statusCode());
return Mono.just(response);
}
private static final ExchangeFilterFunction mdcFilter = (request, next) -> {
// here runs on main(request's) thread
Map<String, String> map = MDC.getCopyOfContextMap();
return next.exchange(request)
.doOnEach(value -> {
// here runs on reactor's thread
if (map != null) {
MDC.setContextMap(map);
}
});
};
핵심 요약
WebClient는 REST Client를 Webflux에 맞게 인터페이스 및 설정을 제공
WebClient Instance는 Immutable
mutate를 통해서 요청 / 응답 설정에 대한 커스텀만 하며, 기본적인 인스턴스는 하나다.
Http 이하 계층과, 비즈니스 부분이 분리될 수 있도록 설계됨 (Connector 설정이 되어있다면…)
WebClient는 총 4단계로 요청 → 응답이 처리를 추상화하였다
- Http Config → Reqeust Config → Exchange → Response Handling
HttpClientConnector는 최초 요청 시 Pool을 만든다. (Provider의 역할)
Timeout 설정은 총 5가지 정도 있음
- Connection Timeout
- Handshake Timeout
- Read Timeout
- Write Timeout
- Response Timeout
REST 요청 중 여러 구간에서 Handler를 달 수 있음
- Request 전 / 후
- Response 전 / 후
- Connection 전 / 후
- Serialize, Deserialize 시
REFERENCE
https://docs.spring.io/spring-framework/reference/web/webflux-webclient.html
반응형
'개발 일지' 카테고리의 다른 글
Kafka Consumer #1 Startup / Rebalance Flow (0) | 2023.06.14 |
---|---|
Expand Kafka Cluster (0) | 2023.06.13 |
HttpClient (Netty) Configuration 정리 (0) | 2023.05.25 |
Kafka Connect 사용성 검토 (0) | 2023.05.24 |
Kafka Schema Registry (0) | 2023.05.23 |