Converting RxJava Observables to Java 8 Completable future and back

I have been working on a small project that converts between different types of Java futures. The most challenging conversion is from/to RxJava observable. The reason is simple – RxJava observable is quite different than let's say Java 8 CompletableFuture. Observable can produce multiple values, future can handle at most one, observable revolves around non-blocking push paradigm, futures mix blocking and non-blocking approach into one API. Nevertheless, the conversion is quite straightforward.

Let's say that we want to create a CompletableFuture that handles first item of an bservable. It's quite simple.

public class ObservableCompletableFuture<T> extends CompletableFuture<T> {
    private final Subscription subscription;
    public ObservableCompletableFuture(Observable<T> observable) {
        subscription = observable.single().subscribe(
                this::complete,
                this::completeExceptionally
        );
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        subscription.unsubscribe();
        return super.cancel(mayInterruptIfRunning);
    }
}

We extend CompletableFuture and then we just subscribe to the observable. Subscribe method accepts two actions. The first will be called when an item is produced, the other one upon error. By calling single() method on Observable we make sure that at most one item will be produced.

The only thing missing is to unsubscribe upon cancel.

What about the opposite direction, if we want to convert CompletableFuture to Observable? It's simple as well.

public class CompletableFutureObservable<T> extends Observable<T> {
    CompletableFutureObservable(CompletableFuture<T> completableFuture) {
        super(onSubscribe(completableFuture));
    }

    private static <T> OnSubscribe<T> onSubscribe(final CompletableFuture<T> completableFuture) {
        return subscriber -> {
            completableFuture.thenAccept(value -> {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(value);
                    subscriber.onCompleted();
                }
                }).exceptionally(throwable -> {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(throwable);
                }
                return null;
            });
        };
    }
}

We just have to describe what to do if someone subscribes to the observable. We just register two callbacks on the CompletableFuture. The first one using thenAccept calls onNext and onCompleted when the CompletableFuture value is ready. The other one calls onError in case of error. Please note that we do not cancel the future if the subscriber unsubscribes. We can not do it, since we do not know how many subscriptions are registered. And even if we knew, we do not know how the future will be used.

Apart from that, the implementation is pretty straightforward. Please note how helpful the lambdas and method references are, without them the code would be much more verbose. If you are interested in the details, do not hesitate to check the code.

Comments are closed.