Hot questions for Spring WebClient

Top 10 Java Open Source / Spring / Spring WebClient

Question:

i'd like to retry the request 3 times after waiting 10sec when response is 5xx. but i don't see a method that I can use. On object

WebClient.builder()
                .baseUrl("...").build().post()
                .retrieve().bodyToMono(...)

i can see methods:

retrying on condition with retry count but no delay

.retry(3, {it is WebClientResponseException && it.statusCode.is5xxServerError} )

retrying with backoff and number of times but no condition

.retryBackoff 

there is also a retryWhen but i'm not sure how to use it


Answer:

You can do this taking the following approach:

  • Use the exchange() method to obtain the response without an exception, and then throw a specific (custom) exception on a 5xx response (this differs from retrieve() which will always throw WebClientResponseException with either a 4xx or 5xx status);
  • Intercept this specific exception in your retry logic;
  • Use reactor-extra - it contains a nice way to use retryWhen() for more complex & specific retries. You can then specify a random backoff retry that starts after 10 seconds, goes up to an arbitrary time and tries a maximum of 3 times. (Or you can use the other available methods to pick a different strategy of course.)

For example:

//...webclient
.exchange()
.flatMap(clientResponse -> {
    if (clientResponse.statusCode().is5xxServerError()) {
        return Mono.error(new ServerErrorException());
    } else {
        //Any further processing
    }
}).retryWhen(
    Retry.anyOf(ServerErrorException.class)
       .randomBackoff(Duration.ofSeconds(10), Duration.ofHours(1))
       .maxRetries(3)
    )
);

Question:

The way I'm setting header is below:

import org.springframework.web.reactive.function.client.WebClient;

WebClient webClient = WebClient.create();
webClient.post().uri(url)
        .headers(httpHeaders -> httpHeaders.setAll(headersMap))
        .body(BodyInserters.fromFormData(HelperMethods.mapToMultiValueMap(body))).exchange();

It works for some services and but where I'm adding custom header base of requirement I'm facing issue.

For setting content type i added following in headers in headersMap(headersMap is a Map): ("Content-Type", "application/json")

But its giving me error: "The HTTP header line ["Content-Type": "application/json"] does not conform to RFC 7230 and has been ignored"

What may be causing this ? I tried sending content type like: ("content-type", "application/json"), But the error is same.

I cannot set header in request using ".contentType()", as number of header is variable which is set dynamically in headersMap.


Answer:

You're sending form data (usually Content-Type: multipart/form-data) with content type pointing to json - send a proper JSON or change your header to appropriate for form data.

Question:

How many concurrent requests can I send if the remote service if blocking? Means: what is the maxConnection pool limit that spring uses internally when using WebClient?

@Autowired
private WebClient webClient;

webClient.post().uri(url).syncBody(req).retrieve().bodyToMono(type);

And moreover: how can I modify it?


Answer:

Before reactor-netty 0.9.0.M4 version there wasn't limit by default because of "elastic" connection provider was used. This fix changed it to "fixed" connection provider with the limit of 500.

To change the connection pool limit you could define your own WebClient.Builder bean and use it to create WebClient

@Bean
public WebClient.Builder webClientBuilder() {
    String connectionProviderName = "myConnectionProvider";
    int maxConnections = 100;
    int acquireTimeout = 1000;
    HttpClient httpClient = HttpClient.create(ConnectionProvider
            .fixed(connectionProviderName, maxConnections, acquireTimeout));
    return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient));
}

Or you could implement custom org.springframework.boot.web.reactive.function.client.WebClientCustomizer in the same manner with the predefined WebClient.Builder

Question:

I'm new to WebClient and reactive programming. I want to get the response body from a request. In case of an error the http-code, headers and body must be logged, but the body should still be returned.

After lots of digging and googling I found two solutions. But both look over complicated to me. Is there a simpler solution?

Staying with a Mono I found this solution:

public Mono<String> log(ProtocolLine protocolLine) {
    return webClient.post()
            .uri("/log")
            .body(BodyInserters.fromObject(protocolLine))
            .exchange()
            .flatMap(clientResponse -> {
                Mono<String> stringMono = clientResponse.bodyToMono(String.class);
                CompletableFuture<String> stringCompleteFuture = new CompletableFuture<String>();
                Mono<String> bodyCompletedMono = Mono.fromFuture(stringCompleteFuture);
                if (clientResponse.statusCode().isError()) {
                    stringMono.subscribe(bodyString -> {
                        LOGGER.error("HttpStatusCode = {}", clientResponse.statusCode());
                        LOGGER.error("HttpHeaders = {}", clientResponse.headers().asHttpHeaders());
                        LOGGER.error("ResponseBody = {}", bodyString);
                        stringCompleteFuture.complete(bodyString);
                    });
                }

                return bodyCompletedMono;
            });
}

Based on Flux it takes less code. But I think I should not use Flux if I know that there will be only one result.

public Flux<String> log(ProtocolLine protocolLine) {
    return webClient.post()
            .uri("/log")
            .body(BodyInserters.fromObject(protocolLine))
            .exchange()
            .flux()
            .flatMap(clientResponse -> {
                Flux<String> stringFlux = clientResponse.bodyToFlux(String.class).share();
                if (clientResponse.statusCode().isError()) {
                    stringFlux.subscribe(bodyString -> {
                        LOGGER.error("HttpStatusCode = {}", clientResponse.statusCode());
                        LOGGER.error("HttpHeaders = {}", clientResponse.headers().asHttpHeaders());
                        LOGGER.error("ResponseBody = {}", bodyString);
                    });
                }

                return stringFlux;
            });
}

Answer:

both solutions are ugly and wrong. You should almost never subscribe in the middle of a reactive pipeline. The subscriber is usually the calling client, not your own application.

    public Mono<String> log(ProtocolLine protocolLine) {
    return webClient.post()
            .uri("/log")
            .body(BodyInserters.fromObject(protocolLine))
            .exchange()
            .flatMap(clientResponse -> clientResponse.bodyToMono(String.class)
                .doOnSuccess(body -> {
                    if (clientResponse.statusCode().isError()) {
                        log.error("HttpStatusCode = {}", clientResponse.statusCode());
                        log.error("HttpHeaders = {}", clientResponse.headers().asHttpHeaders());
                        log.error("ResponseBody = {}", body);
                    }
            }));
}

Here you can see the way of thinking. We always take our clientResponse and map its body to a string. We then doOnSuccess when this Mono is consumed by the subscriber (our calling client) and check the status code if there is an error and if that is the case we log.

The doOnSuccess method returns void so it doesn't "consume" the mono or anything, it just triggers something when this Mono says it "has something in itself", when it's "done" so to speek.

This can be used with Flux the same way.

Question:

My following WebClient is working fine with internet connection but not through our proxy connection.

WebClient webClient = WebClient.builder()
        .baseUrl("https://targetsite.com")
        .build();

webClient.post()
    .uri("/service/serviceName")
    .body(BodyInserters.fromObject(reqData))
    .retrieve()
    .bodyToMono(WebServiceResponse.class)

Event though, the same client is working through proxy, if I set it as mentioned below,

HttpClient httpClient = HttpClient.create()
            .tcpConfiguration(tcpClient -> tcpClient
                .proxy(proxy -> proxy
                    .type(ProxyProvider.Proxy.HTTP)
                    .host("ourproxy.com")
                    .port(8080)));

ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);

WebClient webClient = WebClient.builder()
        .clientConnector(connector)
        .baseUrl("https://targetsite.com")
        .build();

webClient.post()
    .uri("/service/serviceName")
    .body(BodyInserters.fromObject(reqData))
    .retrieve()
    .bodyToMono(WebServiceResponse.class)

But if I set the same Proxy details either using System.setProperty("http.proxyHost","ourproxy.com"); System.setProperty("http.proxyPort","8080"); or JVM run-time arguments -Dhttp.proxyHost=ourproxy.com -Dhttp.proxyPort=8080

WebClient webClient = WebClient.builder()
        .baseUrl("https://targetsite.com")
        .build();

System.setProperty("http.proxyHost", "ourproxy.com");
System.setProperty("http.proxyPort", "8080");

webClient.post()
    .uri("/service/serviceName")
    .body(BodyInserters.fromObject(reqData))
    .retrieve()
    .bodyToMono(WebServiceResponse.class)

The calls are getting failed with UnknownHostException like,

[04/11/2019 12:32:43.031 IST] DEBUG [reactor-http-epoll-3] [PooledConnectionProvider:254] - Creating new client pool [http] for targetsite.com:443
[04/11/2019 12:32:43.033 IST] DEBUG [reactor-http-epoll-3] [PooledConnectionProvider:254] - [id: 0xe4a0dc15] Created new pooled channel, now 0 active connections and 1 inactive connections
[04/11/2019 12:32:43.045 IST] DEBUG [reactor-http-epoll-3] [SslProvider:254] - [id: 0xe4a0dc15] SSL enabled using engine SSLEngineImpl and SNI targetsite.com:443
[04/11/2019 12:32:43.046 IST] DEBUG [reactor-http-epoll-3] [BootstrapHandlers:254] - [id: 0xe4a0dc15] Initialized pipeline DefaultChannelPipeline{(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (reactor.left.sslReader = reactor.netty.tcp.SslProvider$SslReadHandler), (BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.left.decompressor = io.netty.handler.codec.http.HttpContentDecompressor), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
[04/11/2019 12:32:43.165 IST] ERROR [reactor-http-epoll-2] [AbstractErrorWebExceptionHandler:117] - [13ebf1eb] 500 Server Error for HTTP POST "/service/serviceName"
java.net.UnknownHostException: targetsite.com: Name or service not known

Please help, why my code is not functioning if I set the proxy details via JVM run-time arguments or system properties.

Actually I want to avoid code level proxy setting. So please guide me to correct my code or approach, so that I can use JVM run-time argument option.


Answer:

The problem is HttpClient by default won't use Java VM proxy settings. You need to explicitly make it use them. So try the following;

HttpClient httpClient = HttpClient.create()
            .useSystemProperties();

And use that HttpClient to create WebClient set the proxy settings for the JVM either programmatically or using VM Args.

System.setProperty("http.proxyHost","ourproxy.com"); 
System.setProperty("http.proxyPort","8080"); 

-Dhttp.proxyHost=ourproxy.com -Dhttp.proxyPort=8080

Question:

I need to implement following behavior:

  • Make a REST post request
  • If response returns with a status 429 Too many requests, retry up to 3 times with a delay of 1 second
  • If the third retry fails or any other error occurs, log and write something to the database
  • If the request was successful (http status 200), log some information

I would like to use Spring WebClient for this purpose and came up with this code:

Mono<ClientResponse> response = webClient.post()
            .uri(URI.create("/myuri"))
            .body(BodyInserters.fromObject(request))
            .retrieve()
            .onStatus(httpStatus -> httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS), 
                      response -> Mono.error(new TooManyRequestsException("System is overloaded")))
            .bodyToMono(ClientResponse.class)
            .retryWhen(Retry.anyOf(TooManyRequestsException.class)
                                          .fixedBackoff(Duration.ofSeconds(1)).retryMax(3))
            .doOnError(throwable -> saveToDB(some_id, throwable))
            .subscribe(response -> logResponse(some_id, response));

Now I would like to test if the retry mechanism and error handling works as I expect. May be I could use StepVerifier for this purpose, but I just cannot figure out how to use it in my case. Any useful hints?


Answer:

I think that you might be able to test this with a mock web server, e.g. MockWebServer.

@Test
public void testReactiveWebClient() throws IOException
{
    MockWebServer mockWebServer = new MockWebServer();

    String expectedResponse = "expect that it works";
    mockWebServer.enqueue(new MockResponse().setResponseCode(429));
    mockWebServer.enqueue(new MockResponse().setResponseCode(429));
    mockWebServer.enqueue(new MockResponse().setResponseCode(429));
    mockWebServer.enqueue(new MockResponse().setResponseCode(200)
                                  .setBody(expectedResponse));

    mockWebServer.start();

    HttpUrl url = mockWebServer.url("/mvuri");
    WebClient webClient = WebClient.create();

    Mono<String> responseMono = webClient.post()
            .uri(url.uri())
            .body(BodyInserters.fromObject("myRequest"))
            .retrieve()
            .onStatus(
                    httpStatus -> httpStatus.equals(HttpStatus.TOO_MANY_REQUESTS),
                    response -> Mono.error(new TestStuff.TooManyRequestsException("System is overloaded")))
            .bodyToMono(String.class)
            .retryWhen(Retry.anyOf(TestStuff.TooManyRequestsException.class)
                               .fixedBackoff(Duration.ofSeconds(1)).retryMax(3));

    StepVerifier.create(responseMono)
            .expectNext(expectedResponse)
            .expectComplete().verify();

    mockWebServer.shutdown();
}

If you enqueue another MockResponse with a statuscode 429, the verification will fail, same with e.g. errorcode 500.