Caching in Spring 5 WebFlux

spring webflux
spring webflux webclient
spring webclient cache
spring webflux logging
spring webflux functional endpoints
webflux vs mvc
spring reactive
spring cache

Are there any way to cache a Flux coming from WebClient in Spring 5? I tried this but is not caching anything.

@RestController
@SpringBootApplication
@EnableCaching
public class GatewayApplication {

 @PostMapping(value ="/test", produces = "application/json")
 public Flux<String> handleRequest(@RequestBody String body) {
    return getHspadQuery(body);
 }

 @Cacheable("testCache")
 private Flux<String> getData (String body) {
    return WebClient.create().post()
            .uri("http://myurl")
            .body(BodyInserters.fromObject(body))
            .retrieve().bodyToFlux(String.class).cache();
 }
}

When I make the third request it never finishs. And in then in the subsequent requests I get the response but the server throws the following:

2018-04-09 12:36:23.920 ERROR 11488 --- [ctor-http-nio-4] r.ipc.netty.channel.ChannelOperations    : [HttpServer] Error processing connection. Requesting close the channel
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:292) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:251) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:205) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:180) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:646) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:523) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:897) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:372) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:326) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
...

And it never caches anything.

Any help would be appreciated.

Thanks.

There is an reactor cache add-on which can be used with Spring CacheManager. But, as pointed out by the comments in the accepted answer, currently, the Spring cache APIs(gets and puts) are still blocking. We can only make the program fully reactive until this issue is solved.

Here is the sample code snippet in java. The complete sample project is here in github.

@Service
public class CatServiceImpl implements CatService {
    private static final String CACHE_NAME = "sr";
    private static final String KEY = "k";
    @Autowired
    private WebClient client;

    @Autowired
    private CacheManager cacheManager;

    @SuppressWarnings("unchecked")
    private Function<String, Mono<List<Signal<CatDto>>>> reader = k -> Mono
            .justOrEmpty((Optional.ofNullable((List<CatDto>) (cacheManager.getCache(CACHE_NAME).get(k, List.class)))))
            .flatMap(v -> Flux.fromIterable(v).materialize().collectList());

    private BiFunction<String, List<Signal<CatDto>>, Mono<Void>> writer = (k, sigs) -> Flux.fromIterable(sigs)
            .dematerialize().collectList().doOnNext(l -> cacheManager.getCache(CACHE_NAME).put(k, l)).then();

    @Override
    public Flux<CatDto> search() {
        Flux<CatDto> fromServer = client.get().retrieve().bodyToFlux(CatDto.class);

        return CacheFlux.lookup(reader, KEY).onCacheMissResume(fromServer).andWriteWith(writer);
    }

}

Web on Reactive Stack - Project Metadata API Guide, Reactive types support for @Cacheable methods [SPR-14235] #17920. Open. spring-issuemaster opened this issue on Apr 29, 2016 · 5 comments. Open  Caching with Spring webflux Spring Webflux is the recent non-blocking version of Spring built upon project-reactor, it’s not fully compatible with many frameworks that still have a blocking

For now, @Cacheable doesn't work with Flux (and Reactor in general). But regarding your example, each time you call the method, you're creating a new Flux instance, so naturally, it never caches anything.

To be able to cache results, you need to either convert a Flux to a list instance, or simply keep reusing one Flux instance

Reactive types support for @Cacheable methods [SPR-14235 , After 5 seconds we attach a second Flux to it, which will start printing out the same numbers. 1 2 3 4 5 6 7 8 9. Flux<  The reactive-stack web framework, Spring WebFlux, has been added Spring 5.0. It is fully non-blocking, supports reactive streams back pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.

Finally I solved it with Mono. I guess is possible with Flux for example using a reduce.

@RestController
@SpringBootApplication
public class Application {

@Autowired
CacheManager manager;


private WebClient client;

@PostConstruct
public void setup() {
    client = WebClient.builder()
            .baseUrl("http://myurl")
            .exchangeStrategies(ExchangeStrategies.withDefaults())
            .build();
}

@Bean
public CacheManager cacheManager() {
    SimpleCacheManager cacheManager = new SimpleCacheManager();
    cacheManager.setCaches(Arrays.asList(new ConcurrentMapCache("hspad")));
    return cacheManager;
}

@PostMapping(value = "/hspad/grahql", produces = "application/json")
public Mono<ResponseEntity<String>> hspadService(@RequestBody String body) {
    return getHspadQuery(body);
}

private Mono<ResponseEntity<String>> getHspadQuery (String body) {
    Mono<ResponseEntity<String>> mono;
    Optional<Cache.ValueWrapper> value = Optional.ofNullable(cacheManager().getCache("hspad").get(body));

    if(value.isPresent()) {
        mono = Mono.just(ResponseEntity.ok(value.get().get().toString()));
    } else {
        mono = client.post()
                .body(BodyInserters.fromObject(body))
                .retrieve().bodyToMono(String.class).map(response ->
                {
        // Care blocking operation! (use cacheManager -not found yet- prepared for reactive)                        cacheManager().getCache("hspad").putIfAbsent(body,response);
                    return ResponseEntity.ok(response);
                });
    }
    return mono;
}

public static void main(String[] args) {
    SpringApplication.run(GatewayApplication.class, args);
}
}

Flux Caching in Project Reactor: Replaying past data – Reactive , Learn how to build reactive web applications using Spring WebFlux annotations and are important and how they're implemented in Spring framework 5, then you'll get They also allow for easy parallelization and caching. WebClient introduced in Spring 5 is a non-blocking client with support for Reactive Streams. On the client side, we use  WebClient to retrieve data from our endpoints created in EmployeeController. Let's create a simple  EmployeeWebClient : Here we have created a WebClient using its factory method create.

I made an alternative way that annotation & aop based spring reactor cache using reactor cache add-on until spring framework's @Cacheable annotation supports reactive cache.

https://github.com/pkgonan/reactor-cache

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MonoCacheable {

    String value() default "";

}

abstract class AbstractSpringCache<T> {

    protected Cache cache;
    protected Class<T> type;

    protected AbstractSpringCache(Cache cache, Class<T> type) {
        this.cache = cache;
        this.type = type;
    }
}

public class SpringMonoCache<T> extends AbstractSpringCache<T> implements MonoCache<T> {

    public SpringMonoCache(Cache cache, Class<T> type) {
        super(cache, type);
    }

    @Override
    public Mono<T> find(Mono<T> retriever, String key) {
        return CacheMono.lookup(reader, key)
                .onCacheMissResume(retriever)
                .andWriteWith(writer);
    }

    /** Mono Cache reader function **/
    private Function<String, Mono<Signal<? extends T>>> reader = k -> Mono
            .fromCallable(() -> cache.get(k, type))
            .subscribeOn(Schedulers.elastic())
            .flatMap(t -> Mono.justOrEmpty(Signal.next(t)));

    /** Mono Cache writer function **/
    private BiFunction<String, Signal<? extends T>, Mono<Void>> writer = (k, signal) -> Mono
            .fromRunnable(() -> Optional.ofNullable(signal.get())
                    .ifPresent(o -> cache.put(k, o)))
            .subscribeOn(Schedulers.elastic())
            .then();
}

@Aspect
@Component
class ReactorAnnotationCacheAspect {

    ...

    @Around("annotationOfAnyMonoCacheable() && " +
            "executionOfAnyPublicMonoMethod()")
    final Object around(final ProceedingJoinPoint joinPoint) throws Throwable {
        ...

        try {
            return reactorCacheAspectSupport.execute(aspectJInvoker, method, args);
        } catch(...) {}...

        return joinPoint.proceed(args);
    }

    @Pointcut(value = "@annotation(reactor.cache.spring.annotation.MonoCacheable)")
    private void annotationOfAnyMonoCacheable() {}

    @Pointcut(value = "execution(public reactor.core.publisher.Mono *(..))")
    private void executionOfAnyPublicMonoMethod() {}
}


class ReactorCacheAspectSupport {

    private final CacheManager cacheManager;
    ...

    Object execute(final CacheOperationInvoker invoker, final Method method, final Object[] args) {
        ...
        return execute(cache, invoker.invoke(), key, returnType);
    }

    private Object execute(final Cache cache, final Object proceed, final String key, final Class<?> type) {
        ...
        final ReactorCache cacheResolver = getCacheResolver(cache, type);
        return cacheResolver.find(proceed, key);
    }

    ...
}

Mastering Spring Framework 5, Part 2: Spring WebFlux, Spring Webflux and #Cacheable - proper way of caching result of Mono / Flux type setConcurrencyLimit(5); return taskExecutor; } #Bean public Step  The reactive stack, web framework, Spring WebFlux, was added later in version 5.0. It is fully non-blocking, supports Reactive Streams back pressure, and runs on servers such as Netty, Undertow, and Servlet 3.1+ containers.

Spring Webflux and #Cacheable, Caching with Spring webflux. Spring Webflux is the recent non-blocking version of Spring built upon project-reactor, it's not fully compatible with  Name Email Dev Id Roles Organization; Juergen Hoeller: jhoeller<at>pivotal.io: jhoeller

Spring Distributed Cache with Hazelcast - The Startup, Spring WebFlux is reactive-stack web framework, positioned as a successor of well-known Issue 5 - blocking code in Reactor thread We used similar code to check the cache for a particular value and then call the external  Spring Security builds against Spring Framework 5.2.6.RELEASE but should generally work with any newer version of Spring Framework 5.x. Many users are likely to run afoul of the fact that Spring Security’s transitive dependencies resolve Spring Framework 5.2.6.RELEASE, which can cause strange classpath problems.

Migrating a microservice to Spring WebFlux · allegro.tech, The relevant annotations for caching in Spring are as follows: Below, we'll discuss 5 of the top Redis-based Java objects that Radisson users  It is used to cache the short url x long url relation so that it doesn't burden the database with numerous GETs for the same URL, basic caching. Spring also offers a reactive implementation for Redis, so it is a great idea to use it into the architecture.

Comments
  • Thanks and do you know why the "Could not emit buffer due to lack of requests" error? Keep getting this error even without cache.
  • @EstebanS to be able to answer that I'd need to see the server-side code
  • cacheManager().getCache("hspad").putIfAbsent(body,response); looks like a blocking operation to me - you're likely to run into big scalability issues doing that
  • Thaks Brian, how would you face it? Caching is not already integrated with Spring 5 Reactor.
  • We'd need proper caching support from the CacheManager API first; for that, we'd also need reactive support from caching implementations (are you using one right now?). See also jira.spring.io/browse/SPR-14235
  • Please don't just post some tool or library as an answer. At least demonstrate how it solves the problem in the answer itself.
  • umm.. I didn't know that and i added source code about that.