Asynchronous sequential calls based on condition checks in reactor

mono subscribe example
mono flatmap
project reactor

Here, I am trying to make asynchronous and non-blocking calls using reactor and for each request, I may have to call two services in sequence (in my case below, getAccountInfoFromAAA and getAccountInfoFromBBB).

Here is my ItemRequest object:

public class ItemRequest {
    private Account account;
    private Result firstServiceResult;
    private Result secondServiceResult;
    private PostingParameterCode postingParameterCode; //enum 
    //...
    //...
    //getters and setters
}

So, my request input will contain multiple itemRequests and for each itemRequest, I am doing asynchronous calls as:

public void getAccountData(List<ItemRequest> itemRequests) {
    ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
    Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
}

public Mono<ItemRequest> callBothSors(ItemRequest itemRequest) {
    return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest); 
    //here, it will enter into a sequential call for each itemRequest
}

This is my first service call interface:

public Mono<ItemRequest> getAccountDataFromAAA(ItemRequest itemRequest);

This is my second service call interface:

public Mono<ItemRequest> getAccountDataFromBBB(ItemRequest itemRequest);

This method will have upto two calls in sequence based on the condition:

public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
    Mono<ItemRequest> firstCallResult = Mono.empty();
    Mono<ItemRequest> secondCallResult = Mono.empty();

if(isFirstServiceCallRequired(itemRequest)){
    firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest); 
//basically, firstService call will update the accountKey information and
//will also set the result status to OK which is required to decide 
//whether to make secondService call.
} else {
    //Account key is already present, so just update the result status which I need later.
    Result result = new Result();
    result.setStatus(Result.Status.OK);
    result.setMessageText("First call not required as account info is set for item request");
    itemRequest.setFirstServiceResult(result);
}

//Now, before calling the second service, I need to check the following:

if(null!= itemRequest.getFirstServiceResult() && 
    itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) && 
    itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)){ 
        secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
    }

    return firstCallResult.then(secondCallResult);  //attaching the
    //firstCallResult and secondCallResult to produce a single Mono

}

This is working fine when firstCallResult is not required. But when the first call is required, this condition check will not pass since I won't have first call result object updated:

if(null != itemRequest.getFirstServiceResult() && 
    itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) && 
    itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT))) { ... } 
 //this condition check will not pass because first service call is not actually executing

Both cases works fine if I put the following statement:

if(isFirstServiceCallRequired(itemRequest)){
        firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest); 
        firstCallResult.block(); //adding this case will work on both cases 
    }

But, I don't think I will get the reactors benefit this way. I was thinking to have the logic like this:

Mono<ItemRequest> result = firstService.call(...)
    .doOnNext(/*do something */)
    .then( ... secondService.call())

But couldn't figure out the way to chain the secondService with firstService to get the mono result and have those condition checks too. Condition check is important since I don't always want to execute the second service. Is there any way to chain the secondService with firstService to get the result and have those condition checks too?

Apologies for the long question. Any suggestions/help would be greatly appreciated.


After offering the bounty points to this question, I was really excited and expecting some answers. But anyways, I am able to improve my initial solution and have those condition checks too.

I did the following: I changed the return type from Mono<ItemRequest> to Mono<Void> in both service calls since I am basically updating the data to ItemRequest list:

Handling the parallel call here (each parallel call has a sequential call):

public void getAccountData(List<ItemRequest> itemRequests) {
        ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
        Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
    }

    public Mono<Void> callBothSors(ItemRequest itemRequest) {
        return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest);
        //here, it will enter into a sequential call for each itemRequest
    }

and these are my firstServiceCall and secondServiceCall interface changes:

public Mono<Void> getAccountDataFromAAA(ItemRequest itemRequest);

public Mono<Void> getAccountDataFromBBB(ItemRequest itemRequest);

and I chained the secondServiceCall with firstServiceCall to get the mono result and have those condition checks too as:

public Mono<Void> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
    Mono<Void> callSequence = Mono.empty();

    if(isFirstServiceCallRequired(itemRequest)){
        callSequence = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
    } else {
        //Account key is already present, so just update the result status which I need later.
        Result result = new Result();
        result.setStatus(Result.Status.OK);
        result.setMessageText("First call not required as account info is set for item request");
        itemRequest.setFirstServiceResult(result);
    }

    return callSequence.thenEmpty(Mono.defer(() -> {
        //note: Mono.defer ==>> Create a Mono provider that will supply a target Mono to subscribe to 
        //for each subscriber downstream.
        //only if the firstServiceCall result is successful & other condition check successful,
        // I am calling secondServiceCall:  
        if(shouldCallSecondService(itemRequest)){
            return this.secondServiceCallImpl.getAccountDataFromAAAandBBB(itemRequest);
        } else {
            return Mono.empty();
        }
    }))

Conditional Logic and RX - Netifi, You look through the operators of RxJava or reactor-core looking for… How do you do conditional logic with Rx style code then? If you haven't Here's the first example for above but now we're calling an asynchronous method. The check method now returns a Mono<String> which can be asynchronous. In automata theory, an asynchronous circuit, or self-timed circuit, is a sequential digital logic circuit which is not governed by a clock circuit or global clock signal. Instead it often uses signals that indicate completion of instructions and operations, specified by simple data transfer protocols. This type of circuit is contrasted with synchronous circuits, in which changes to the signal values in the circuit are triggered by repetitive pulses called a clock signal. Most digital devices tod


Reactor 3 Reference Guide, We recommend that you check out a local copy of the repository so It offers composable asynchronous sequence APIs — Flux (for [N] One major difference is that, while an Iterator is pull-based, reactive streams are push-based​. A Publisher can push new values to its Subscriber (by calling onNext )  9 Asynchronous sequential calls based on condition checks in reactor Oct 26 '18 8 Fortify high: Access specifier manipulation on reflection that is used to invoke a private constructor Jan 26 '17 7 PHP Date - How to add a string to separate date and time May 31 '13


public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest) {
  Mono<ItemRequest> firstCallResult = Mono.empty();
  Mono<ItemRequest> secondCallResult = Mono.empty();

  if (isFirstServiceCallRequired(itemRequest)) {
    firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
    //basically, firstService call will update the accountKey information and
    //will also set the result status to OK which is required to decide
    //whether to make secondService call.
  } else {
  /*Account key is already present, so just update the result status which I need 
  later.*/
    firstCallResult = Mono.defer(() -> {
      Result result = new Result();
      result.setStatus(Result.Status.OK);
      result.setMessageText("First call not required as account info is set for item request");
      itemRequest.setFirstServiceResult(result);
      return Mono.just(itemRequest);
    });
  }

  return firstCallResult.flatMap(itReq -> {
    //Now, before calling the second service, I need to check the following:
    if (null != itemRequest.getFirstServiceResult() &&
        itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) &&
      itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)) {
        return secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
  } else {
    return itReq;
  }
  });
}

The next simple example can help you with flatMap understanding:

public static void main(String[] args) {

  callExternalServiceA.flatMap(response -> {
    if(response.equals("200")){
      return Mono.just(response);
    } else {
      return callExtertnalServiceB();
    }
  }).block();

}

public static Mono<String> callExtertnalServiceA() {
  return Mono.defer(() -> {
    System.out.println("Call external service A");
    return Mono.just("400");
  });
}

public static Mono<String> callExtertnalServiceB() {
  return Mono.defer(() -> {
    System.out.println("Call external service B");
    return Mono.just("200");
  });
}

Reactor by Example, It offers composable asynchronous sequence APIs Flux ([N] RELEASE is the latest version of the plugin, check for updates. One major difference is that while an Iterator is pull based, reactive streams are push-based. an error (​calling onError and terminating the sequence) or completion (calling  C. Piguet, "Robustness of Asynchronous Sequential Standard Cells in a Synchronous Environment", AINT'2000, "Asynchronous Interfaces", Delft, The Netherlands, July 19-20, 2000. Logic Design with


Reactive Programming with Spring Reactor, Reactor is a 4th gen reactive library launched by Pivotal. Thus an asynchronous task that just wants to signal completion can use a Mono<Void> . of merging multiple sequential uses of certain operators (eg. calling concatWith But Reactor is based on Java 8, and the Stream API is expressive enough  Reactor provides different styles of Dispatchers because every asynchronous application has different dispatching needs in different parts of the application. When ingesting tidal waves of data, for example, a Reactor will want to use the high-speed non-blocking Dispatcher based on the venerable Disruptor RingBuffer .


Reactor Guide, Systems built as reactive systems are thus more flexible, achieve the goal—​asynchronous streams of data with non-blocking back-pressure. only interested in a single user and not a stream of multiple results. This Mono will wait for 3 seconds on the “call” to Spring 4 or 2 seconds on that of Spring 5. Using the asynchronous API commands provided with CICS is a simple way to issue requests to external services and have them run asynchronously. Instead of calling each service sequentially and waiting for a response, the asynchronous API commands provide a simple and powerful way to write asynchronous applications which save wait time and frees up your program to continue with other processing.


StepVerifier and how to use it, 2, Synchronization will force stop-and-check for every thread. Subscriber: A consumer of a data sequence (from 0 to N signals where N can be unlimited). Async IO capabilities are also depending on Stream Capacity for backpressure and //Shutdown internal thread and call complete processor. You can use the redux-pack middleware to dispatch promise-based asynchronous actions. You can even write a custom middleware to describe calls to your API, like the real world example does. It is up to you to try a few options, choose a convention you like, and follow it, whether with, or without the middleware.