Using CompletableFuture within Filter Function

I have a use case in which I want to filter out few elements in the list based on a Network call that I perform on the element. To accomplish this I am using streams, filter and Completable Future. The goal is to do async execution so that the operation becomes efficient. The pseudo code for this is mentioned below.

public List<Integer> afterFilteringList(List<Integer> initialList){
   List<Integer> afterFilteringList =initialList.stream().filter(element -> {
        boolean valid = true;
        try{
            valid = makeNetworkCallAndCheck().get();
        } catch (Exception e) {

        }
        return valid;
    }).collect(Collectors.toList());

    return afterFilteringList;
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
   return CompletableFuture.completedFuture(resultOfNetWorkCall(value);
 }

The question I am having over here is, Am I doing this operation in an Async way itself?(As I am using 'get' function within the filter will it block the execution and make it sequential only) Or Is there a better way of doing this in Async way using Completable Future and Filters in Java 8.

When you call get immediately, you are indeed destroying the benefit of asynchronous execution. The solution is to collect all asynchronous jobs first, before joining.

public List<Integer> afterFilteringList(List<Integer> initialList){
    Map<Integer,CompletableFuture<Boolean>> jobs = initialList.stream()
        .collect(Collectors.toMap(Function.identity(), this::makeNetworkCallAndCheck));
    return initialList.stream()
        .filter(element -> jobs.get(element).join())
        .collect(Collectors.toList());
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
   return CompletableFuture.supplyAsync(() -> resultOfNetWorkCall(value));
}

Of course, the method makeNetworkCallAndCheck has to initiate a truly asynchronous operation as well. Calling a method synchronously and returning a completedFuture is not sufficient. I provided a simple exemplary asynchronous operation here, but for I/O operations, you likely want to provide your own Executor, tailored to the number of simultaneous connections you want to allow.

Java CompletableFuture Tutorial with Examples, In this tutorial you'll learn What CompletableFuture is and how to use You don't have the ability to attach a callback function to the Future and  CompletableFuture<Double> result = getUserDetail(userId) .thenCompose(user -> getCreditRating(user)); So, Rule of thumb here - If your callback function returns a CompletableFuture, and you want a flattened result from the CompletableFuture chain (which in most cases you would), then use thenCompose (). 2.

If you use get(), it will not be Async

get(): Waits if necessary for this future to complete, and then returns its result.

If you want to process all the request in Async. You can use CompletetableFuture.allOf()

public List<Integer> filterList(List<Integer> initialList){
    List<Integer> filteredList = Collections.synchronizedList(new ArrayList());
    AtomicInteger atomicInteger = new AtomicInteger(0);
    CompletableFuture[] completableFutures = new CompletableFuture[initialList.size()];
    initialList.forEach(x->{
        completableFutures[atomicInteger.getAndIncrement()] = CompletableFuture
            .runAsync(()->{
                if(makeNetworkCallAndCheck(x)){
                    filteredList.add(x);
                }
        });
    });

    CompletableFuture.allOf(completableFutures).join();
    return filteredList;
}

private Boolean makeNetworkCallAndCheck(Integer value){
    // TODO: write the logic;
    return true;
}

20 Examples of Using Java's CompletableFuture, The simplest example creates an already completed CompletableFuture with a predefined result. Usually, this may act as the starting stage in  First of all, the CompletableFuture class implements the Future interface, so you can use it as a Future implementation, but with additional completion logic.. For example, you can create an instance of this class with a no-arg constructor to represent some future result, hand it out to the consumers and complete it at some time in the future using the complete method.

Collection.parallelStream() is an easy way to do the async stuff for a collection. You can modify your code as the following:

public List<Integer> afterFilteringList(List<Integer> initialList){
    List<Integer> afterFilteringList =initialList
            .parallelStream()
            .filter(this::makeNetworkCallAndCheck)
            .collect(Collectors.toList());

    return afterFilteringList;
}
public Boolean makeNetworkCallAndCheck(Integer value){
    return resultOfNetWorkCall(value);
}

You can customize your own executor by this way. And the result order is guaranteed according to this.

I have write the following code to verify my what I said.

public class  DemoApplication {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(50);
        final List<Integer> integers = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            integers.add(i);
        }
        long before = System.currentTimeMillis();
        List<Integer> items = forkJoinPool.submit(() ->
                integers
                        .parallelStream()
                        .filter(it -> {
                            try {
                                Thread.sleep(10000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            return true;
                        })
                        .collect(Collectors.toList()))
                .get();
        long after = System.currentTimeMillis();
        System.out.println(after - before);
    }
}

I create my own ForkJoinPool, and it takes me 10019 milliseconds to finish 50 jobs in parallel although each one costs 10000 milliseconds.

Functional-Style Callbacks Using Java 8's CompletableFuture, Functional-Style Callbacks Using Java 8's CompletableFuture concurrency library, in the form of the class CompletableFuture<T> . for the favored alliance and, if it is present, use it to filter the stream of services to be used: We can easily wrap objects within futures by using the static method CompletableFuture.completedFuture(obj). This returns an already completed CompletableFuture with obj as the result. The other way is to construct a CompletableFuture and manually set the result on it when the result is available. We talked about a caching method using both

Write Clean Asynchronous Code With CompletableFuture Java-8 , CompletableFuture is inspired from ListenableFuture in Guava and Are completion and does not have capability to attach a callback function. As mentioned in this answer, you can also use CompletableFuture.allOf() to denote an empty list of tasks, to the same result. But you may use .orElseGet(() -> …) to avoid constructing the completed future in advance even when unneeded. Further, you may replace your chain of thenApply(…).thenCompose(…) with a single thenCompose(…):

Fantastic CompletableFuture.allOf() and how to handle errors., For the last week, I was working on performance improvement in microservice. And let's make it asynchronous task by CompletableFuture. And the last step of the post processing should be improved to filter out the null For an example, if I use the following list for the above code the last parameter will  Slides of my talk as Devoxx 2015. How to set up asynchronous data processing pipelines using the CompletionStage / CompletableFuture API, including how to control threads and how to handle exceptions.

CompletableFuture practical guide - Yurko, CompletableFuture<T> is a class in java.util.concurrent package which For example if one chain completableFuture and simple method with there is need to transform (enrich, change, filter etc) the result to specific needs: The Excel FILTER function "filters" a range of data based on supplied criteria. The result is an array of matching values the original range. When this array is the final result (i.e. the results are not handed off to another function) matching results will "spill" on to the worksheet.

Comments
  • You forgot to pass element to makeNetworkCallAndCheck. Also, it looks weird that the exceptional case is considered "valid".
  • @Holger The title mismatches the actual question I believe. Am I doing this operation in an Async way itself? ... IMHO the calls are not async and the gets would be blocking...thoughts?
  • @nullpointer well, calling get() immediately destroys the benefit of asynchronous execution, no doubt, but I don’t know, what to suggest as solution, given this code. E.g. the input List<Integer> magically becomes a List<Long> during the stream operation, to eventually be returned as List<Integer>. I guess, it’s supposed to be the same Integer objects all the time, but I don’t want to write code based on assumptions…
  • There is no guaranty that the result list has the right order with this code.
  • You don't need an AtomicInteger here, because it is not being shared with another thread.
  • The disadvantage of parallel streams is the lack of control over the executor, as for I/O operations, you usually don’t want the default pool parallelism, which is tailored to the number of CPU cores.
  • @Holger I think so before and even mention that in my post until I see this post, stackoverflow.com/a/22269778/5053214
  • Well, that’s an undocumented side effect without official support. And it only works for Fork/Join pools, not for arbitrary Executor implementations.