Category Archives: Articles in English

Converting ListenableFutures to CompletableFutures and back

Java 8 introduced CompletableFutures. They build on standard Futures and add completion callbacks, future chaining and other useful stuff. But the world did not wait for Java 8 and lot of libraries added different variants of ListenableFutures which serve the same purpose. Even today are some library authors reluctant to add support for CompletableFutures. It makes sense, Java 8 is quite new and it’s not easy to add support for CompletableFutures and be compatible with Java 7 at the same time.

Luckily it’s easy to convert to CompletableFutures and back. Let’s take Spring 4 ListenableFutures as an example. How to convert it to CompletableFuture?

static <T> CompletableFuture<T> buildCompletableFutureFromListenableFuture(
		final ListenableFuture<T> listenableFuture
	) {
        //create an instance of CompletableFuture
        CompletableFuture<T> completable = new CompletableFuture<T>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                // propagate cancel to the listenable future
                boolean result = listenableFuture.cancel(mayInterruptIfRunning);
                super.cancel(mayInterruptIfRunning);
                return result;
            }
        };

        // add callback
        listenableFuture.addCallback(new ListenableFutureCallback<T>() {
            @Override
            public void onSuccess(T result) {
                completable.complete(result);
            }

            @Override
            public void onFailure(Throwable t) {
                completable.completeExceptionally(t);
            }
        });
        return completable;
    }

We just create a CompletableFuture instance and add a callback to the ListenableFuture. In the callback method we just notify the CompletableFuture that the underlying task has finished. We can even propagate call to cancel method if we want to. That’s all you need to convert to CompletableFuture.

What about the opposite direction? It’s more or less straightforward as well

class ListenableCompletableFutureWrapper<T> implements ListenableFuture<T> {
    private final ListenableFutureCallbackRegistry<T> callbackRegistry 
					= new ListenableFutureCallbackRegistry<>();

    private final Future<T> wrappedFuture;

    ListenableCompletableFutureWrapper(CompletableFuture<T> wrappedFuture) {
        this.wrappedFuture = wrappedFuture;
        wrappedFuture.whenComplete((result, ex) -> {
            if (ex != null) {
                if (ex instanceof CompletionException && ex.getCause() != null) {
                    callbackRegistry.failure(ex.getCause());
                } else {
                    callbackRegistry.failure(ex);
                }
            } else {
                callbackRegistry.success(result);
            }
        });
    }

    @Override
    public void addCallback(ListenableFutureCallback<? super T> callback) {
        callbackRegistry.addCallback(callback);
    }


    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return wrappedFuture.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return wrappedFuture.isCancelled();
    }

    @Override
    public boolean isDone() {
        return wrappedFuture.isDone();
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return wrappedFuture.get();
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return wrappedFuture.get(timeout, unit);
    }
}

We just wrap the CompletableFuture and again register a callback. The only non-obvious part is the use of ListenableFutureCallbackRegistry which keeps track of registered ListenableFutureCallbacks. We also have to do some exception processing, but that’s all.

If you need to do something like this, I have good news. I have wrapped the code to a reusable library, so you do not have to copy and paste the code, you can just use it as described in the library documentation.

What are ListenableFutures good for?

Standard Futures are in Java since version 5. But recently they started to lose breath. Nowadays we do have asynchronous servlets, asynchronous HTTP clients and lot of other asynchronous libraries. Good old Futures are not good enough for connecting those pieces together.

Let’s demonstrate it on a simple example. It’s just one of many possible examples, but I think it’s the most representative one. Imagine we have a simple servlet controller that just calls some backend API, processes the result and returns it. If you think about it you will notice that we do not need any thread for most of the processing time, we are just waiting for the response. On one end we do have asynchronous servlet and on the other asynchronous HTTP client. We just need to connect those two. In the following example I will be using Spring 4 features, but it’s easy to transform it to your favorite library.

@Controller
@EnableAutoConfiguration
public class ListenableFutureAsyncController {
    // Let's use Apache Async HTTP client
    private final AsyncRestTemplate restTemplate = new AsyncRestTemplate(
        new HttpComponentsAsyncClientHttpRequestFactory()
    );

    @RequestMapping("/")
    @ResponseBody
    DeferredResult<String> home() {
        // Create DeferredResult with timeout 5s
        final DeferredResult<String> result = new DeferredResult<>(5000);

        // Let's call the backend
        ListenableFuture<ResponseEntity<String>> future = 
            restTemplate.getForEntity("http://www.google.com", String.class);

        future.addCallback(
          new ListenableFutureCallback<ResponseEntity<String>>() {
            @Override
            public void onSuccess(ResponseEntity<String> response) {
                // Will be called in HttpClient thread
                log("Success");
                result.setResult(response.getBody());
            }

            @Override
            public void onFailure(Throwable t) {
                result.setErrorResult(t.getMessage());
            }
        });
        // Return the thread to servlet container, 
        // the response will be processed by another thread.
        return result;
    }

    public static void log(Object message) {
        System.out.println(format("%s %s ",Thread.currentThread().getName(), message));
    }

    // That's all you need to start the application
    public static void main(String[] args) throws Exception {
        SpringApplication.run(ListenableFutureAsyncController.class, args);
    }
}

The method starts by creating DeferredResult. It’s a handy abstraction around asynchronous servlets. If we return DeferredResult, servlet container thread is returned to the pool and another one is later used for sending the result. To send the result, we have to either call setResult or setErrorResult method from another thread.

In the next step we call the backend. We use Spring 4 AsyncRestTemplate which is able to wrap Apache Async HTTP Client. It returns ListenableFuture and we can use callbacks to say what to do when the backend request succeeds or fails. Then it’s straightforward to return the result. Please note that the callback is called using HttpClient I/O dispatcher thread. It’s fine in our simple case but for more CPU intensive task we would have to use another thread-pool.

The callback is what’s important, that’s what makes ListenableFutures different. In fact, it’s the only difference between Spring ListenableFuture and standard Java 5 future. It’s a small difference, but without it we would not be able to implement the example above.

If we write the code like this, we need a thread only for commencing the request and for sending the response back. The rest is handled asynchronously by NIO.

You can also notice that the code is much more complicated than equivalent synchronous code. It’s more similar to NodeJS code than to what we are used to in Java. We do have those strange callbacks, different threads processing the same request and other conceptually complicated stuff. So, if you do not need to scale to thousands concurrent requests, you might want to think twice about using this approach. But if you need to scale, this is the direction to go. Next time we will re-implement the same example using Java 8 Completable futures to see if it looks better.

The full source code is available here. It uses Spring Boot so the class you have seen is really all you need.

CompletableFutures – why to use async methods?

I have been playing with Java 8 CompletableFutures and one thing has been bothering me – shall I use methods with Async suffix or the variant without it. Let’s take a look at the following example:

CompletableFuture.supplyAsync(() -> getUser(userId))
                .thenApply(CreditRatingService::getCreditRatingSystem1)
                .thenAccept(System.out::println);

private static User getUser(int id) {...}

private static CreditRating getCreditRatingSystem1(User user) {...}

Here I want to get some data about a user from a remote system, then call another system to get his credit rating and once it is finished, I want to print the result. All of the tasks may take some time, so I do not want to block the main thread, hence the use of completable futures.

If you are not yet familiar with Java 8 syntax, () -> getUser(userId) is a lambda calling getUser method. CreditRatingService::getCreditRatingSystem1 and System.out::println are method references. Underneath, the supplyAsync method creates a task which gets the user details. The task is submitted to fork-join thread pool and when it finishes, the result is passed to the next task. When the next task finishes, its result is sent further and so on. It’s quite neat and simple.

The question is, whether I should use the previous variant or this one.

CompletableFuture.supplyAsync(() -> getUser(userId))
                .thenApplyAsync(CreditRatingService::getCreditRatingSystem1)
                .thenAcceptAsync(System.out::println);

The difference is in the async suffix on the method names. The methods without async execute their task in the same thread as the previous task. So in the first example, all getUser, getCreditRating and println are executed in the same thread. It’s OK, it’s still a thread from the fork-join pool, so the main thread is not blocked.

The second variant always submits the succeeding task to the pool, so each of the tasks can be handled by different thread. The result will be the same, the first variant is a bit more effective due to less thread switching overhead. It does not make any sense to use the async variant here, so what is it good for? It was not clear to me, until extended the example to download the credit rating data from two different systems.

CompletableFuture<User> user = CompletableFuture.supplyAsync(() -> getUser(userId));

CompletableFuture<CreditRating> rating1 = 
    user.thenApplyAsync(CreditRatingService::getCreditRatingSystem1);
CompletableFuture<CreditRating> rating2 = 
    user.thenApplyAsync(CreditRatingService::getCreditRatingSystem2);

rating1
    .thenCombineAsync(rating2, CreditRating::combine)
    .thenAccept(System.out::println);

Here I have two actions waiting for user details. Once the details are available, I want to get two credit ratings from two different systems. Since I want to do it in parallel, I have to use at least one async method. Without async, the code would use only one thread so both credit rating tasks would be executed serially.

I have added combine phase that waits for both credit rating tasks to complete. It’s better to make this async too, but from different reason. Without async, the same thread as in rating1 would be used. But you do not want to block the thread while waiting for the rating2 task. You want to return it to the pool and get a thread only when it is needed.

When both tasks are ready, we can combine the result and print it in the same thread, so the last thenAccept is without async.

As we can see, both method variants are useful. Use async, when you need to execute more tasks in parallel or you do not want to block a thread while waiting for a task to finish. But since it is not easy to reason about such programs, I would recommend to use async everywhere. It might be little bit less effective, but the difference is negligible. For very small small price, it gives you the freedom to delegate your thinking to the machine. Just make everything async and let the fork-join framework care about the optimization.