Archive for the ‘Articles in English’ Category

Converting RxJava Observables to Java 8 Completable future and back

Tuesday, April 28th, 2015

I have been working on a small project that converts between different types of Java futures. The most challenging conversion is from/to RxJava observable. The reason is simple – RxJava observable is quite different than let's say Java 8 CompletableFuture. Observable can produce multiple values, future can handle at most one, observable revolves around non-blocking push paradigm, futures mix blocking and non-blocking approach into one API. Nevertheless, the conversion is quite straightforward.

Let's say that we want to create a CompletableFuture that handles first item of an bservable. It's quite simple.

public class ObservableCompletableFuture<T> extends CompletableFuture<T> {
    private final Subscription subscription;
    public ObservableCompletableFuture(Observable<T> observable) {
        subscription = observable.single().subscribe(
                this::complete,
                this::completeExceptionally
        );
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        subscription.unsubscribe();
        return super.cancel(mayInterruptIfRunning);
    }
}

We extend CompletableFuture and then we just subscribe to the observable. Subscribe method accepts two actions. The first will be called when an item is produced, the other one upon error. By calling single() method on Observable we make sure that at most one item will be produced.

The only thing missing is to unsubscribe upon cancel.

What about the opposite direction, if we want to convert CompletableFuture to Observable? It's simple as well.

public class CompletableFutureObservable<T> extends Observable<T> {
    CompletableFutureObservable(CompletableFuture<T> completableFuture) {
        super(onSubscribe(completableFuture));
    }

    private static <T> OnSubscribe<T> onSubscribe(final CompletableFuture<T> completableFuture) {
        return subscriber -> {
            completableFuture.thenAccept(value -> {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(value);
                    subscriber.onCompleted();
                }
                }).exceptionally(throwable -> {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(throwable);
                }
                return null;
            });
        };
    }
}

We just have to describe what to do if someone subscribes to the observable. We just register two callbacks on the CompletableFuture. The first one using thenAccept calls onNext and onCompleted when the CompletableFuture value is ready. The other one calls onError in case of error. Please note that we do not cancel the future if the subscriber unsubscribes. We can not do it, since we do not know how many subscriptions are registered. And even if we knew, we do not know how the future will be used.

Apart from that, the implementation is pretty straightforward. Please note how helpful the lambdas and method references are, without them the code would be much more verbose. If you are interested in the details, do not hesitate to check the code.

Converting ListenableFutures to CompletableFutures and back

Wednesday, June 11th, 2014

Java 8 introduced CompletableFutures. They build on standard Futures and add completion callbacks, future chaining and other useful stuff. But the world did not wait for Java 8 and lot of libraries added different variants of ListenableFutures which serve the same purpose. Even today are some library authors reluctant to add support for CompletableFutures. It makes sense, Java 8 is quite new and it's not easy to add support for CompletableFutures and be compatible with Java 7 at the same time.

Luckily it's easy to convert to CompletableFutures and back. Let's take Spring 4 ListenableFutures as an example. How to convert it to CompletableFuture?

static <T> CompletableFuture<T> buildCompletableFutureFromListenableFuture(
		final ListenableFuture<T> listenableFuture
	) {
        //create an instance of CompletableFuture
        CompletableFuture<T> completable = new CompletableFuture<T>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                // propagate cancel to the listenable future
                boolean result = listenableFuture.cancel(mayInterruptIfRunning);
                super.cancel(mayInterruptIfRunning);
                return result;
            }
        };

        // add callback
        listenableFuture.addCallback(new ListenableFutureCallback<T>() {
            @Override
            public void onSuccess(T result) {
                completable.complete(result);
            }

            @Override
            public void onFailure(Throwable t) {
                completable.completeExceptionally(t);
            }
        });
        return completable;
    }

We just create a CompletableFuture instance and add a callback to the ListenableFuture. In the callback method we just notify the CompletableFuture that the underlying task has finished. We can even propagate call to cancel method if we want to. That's all you need to convert to CompletableFuture.

What about the opposite direction? It's more or less straightforward as well

class ListenableCompletableFutureWrapper<T> implements ListenableFuture<T> {
    private final ListenableFutureCallbackRegistry<T> callbackRegistry 
					= new ListenableFutureCallbackRegistry<>();

    private final Future<T> wrappedFuture;

    ListenableCompletableFutureWrapper(CompletableFuture<T> wrappedFuture) {
        this.wrappedFuture = wrappedFuture;
        wrappedFuture.whenComplete((result, ex) -> {
            if (ex != null) {
                if (ex instanceof CompletionException && ex.getCause() != null) {
                    callbackRegistry.failure(ex.getCause());
                } else {
                    callbackRegistry.failure(ex);
                }
            } else {
                callbackRegistry.success(result);
            }
        });
    }

    @Override
    public void addCallback(ListenableFutureCallback<? super T> callback) {
        callbackRegistry.addCallback(callback);
    }


    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return wrappedFuture.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return wrappedFuture.isCancelled();
    }

    @Override
    public boolean isDone() {
        return wrappedFuture.isDone();
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return wrappedFuture.get();
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return wrappedFuture.get(timeout, unit);
    }
}

We just wrap the CompletableFuture and again register a callback. The only non-obvious part is the use of ListenableFutureCallbackRegistry which keeps track of registered ListenableFutureCallbacks. We also have to do some exception processing, but that's all.

If you need to do something like this, I have good news. I have wrapped the code to a reusable library, so you do not have to copy and paste the code, you can just use it as described in the library documentation.

What are ListenableFutures good for?

Tuesday, June 10th, 2014

Standard Futures are in Java since version 5. But recently they started to lose breath. Nowadays we do have asynchronous servlets, asynchronous HTTP clients and lot of other asynchronous libraries. Good old Futures are not good enough for connecting those pieces together.

Let's demonstrate it on a simple example. It's just one of many possible examples, but I think it's the most representative one. Imagine we have a simple servlet controller that just calls some backend API, processes the result and returns it. If you think about it you will notice that we do not need any thread for most of the processing time, we are just waiting for the response. On one end we do have asynchronous servlet and on the other asynchronous HTTP client. We just need to connect those two. In the following example I will be using Spring 4 features, but it's easy to transform it to your favorite library.

@Controller
@EnableAutoConfiguration
public class ListenableFutureAsyncController {
    // Let's use Apache Async HTTP client
    private final AsyncRestTemplate restTemplate = new AsyncRestTemplate(
        new HttpComponentsAsyncClientHttpRequestFactory()
    );

    @RequestMapping("/")
    @ResponseBody
    DeferredResult<String> home() {
        // Create DeferredResult with timeout 5s
        final DeferredResult<String> result = new DeferredResult<>(5000);

        // Let's call the backend
        ListenableFuture<ResponseEntity<String>> future = 
            restTemplate.getForEntity("http://www.google.com", String.class);

        future.addCallback(
          new ListenableFutureCallback<ResponseEntity<String>>() {
            @Override
            public void onSuccess(ResponseEntity<String> response) {
                // Will be called in HttpClient thread
                log("Success");
                result.setResult(response.getBody());
            }

            @Override
            public void onFailure(Throwable t) {
                result.setErrorResult(t.getMessage());
            }
        });
        // Return the thread to servlet container, 
        // the response will be processed by another thread.
        return result;
    }

    public static void log(Object message) {
        System.out.println(format("%s %s ",Thread.currentThread().getName(), message));
    }

    // That's all you need to start the application
    public static void main(String[] args) throws Exception {
        SpringApplication.run(ListenableFutureAsyncController.class, args);
    }
}

The method starts by creating DeferredResult. It's a handy abstraction around asynchronous servlets. If we return DeferredResult, servlet container thread is returned to the pool and another one is later used for sending the result. To send the result, we have to either call setResult or setErrorResult method from another thread.

In the next step we call the backend. We use Spring 4 AsyncRestTemplate which is able to wrap Apache Async HTTP Client. It returns ListenableFuture and we can use callbacks to say what to do when the backend request succeeds or fails. Then it's straightforward to return the result. Please note that the callback is called using HttpClient I/O dispatcher thread. It's fine in our simple case but for more CPU intensive task we would have to use another thread-pool.

The callback is what's important, that's what makes ListenableFutures different. In fact, it's the only difference between Spring ListenableFuture and standard Java 5 future. It's a small difference, but without it we would not be able to implement the example above.

If we write the code like this, we need a thread only for commencing the request and for sending the response back. The rest is handled asynchronously by NIO.

You can also notice that the code is much more complicated than equivalent synchronous code. It's more similar to NodeJS code than to what we are used to in Java. We do have those strange callbacks, different threads processing the same request and other conceptually complicated stuff. So, if you do not need to scale to thousands concurrent requests, you might want to think twice about using this approach. But if you need to scale, this is the direction to go. Next time we will re-implement the same example using Java 8 Completable futures to see if it looks better.

The full source code is available here. It uses Spring Boot so the class you have seen is really all you need.