Archive for the ‘Threads’ Category

Converting ListenableFutures to CompletableFutures and back

Wednesday, June 11th, 2014

Java 8 introduced CompletableFutures. They build on standard Futures and add completion callbacks, future chaining and other useful stuff. But the world did not wait for Java 8 and lot of libraries added different variants of ListenableFutures which serve the same purpose. Even today are some library authors reluctant to add support for CompletableFutures. It makes sense, Java 8 is quite new and it's not easy to add support for CompletableFutures and be compatible with Java 7 at the same time.

Luckily it's easy to convert to CompletableFutures and back. Let's take Spring 4 ListenableFutures as an example. How to convert it to CompletableFuture?

static <T> CompletableFuture<T> buildCompletableFutureFromListenableFuture(
		final ListenableFuture<T> listenableFuture
	) {
        //create an instance of CompletableFuture
        CompletableFuture<T> completable = new CompletableFuture<T>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                // propagate cancel to the listenable future
                boolean result = listenableFuture.cancel(mayInterruptIfRunning);
                super.cancel(mayInterruptIfRunning);
                return result;
            }
        };

        // add callback
        listenableFuture.addCallback(new ListenableFutureCallback<T>() {
            @Override
            public void onSuccess(T result) {
                completable.complete(result);
            }

            @Override
            public void onFailure(Throwable t) {
                completable.completeExceptionally(t);
            }
        });
        return completable;
    }

We just create a CompletableFuture instance and add a callback to the ListenableFuture. In the callback method we just notify the CompletableFuture that the underlying task has finished. We can even propagate call to cancel method if we want to. That's all you need to convert to CompletableFuture.

What about the opposite direction? It's more or less straightforward as well

class ListenableCompletableFutureWrapper<T> implements ListenableFuture<T> {
    private final ListenableFutureCallbackRegistry<T> callbackRegistry 
					= new ListenableFutureCallbackRegistry<>();

    private final Future<T> wrappedFuture;

    ListenableCompletableFutureWrapper(CompletableFuture<T> wrappedFuture) {
        this.wrappedFuture = wrappedFuture;
        wrappedFuture.whenComplete((result, ex) -> {
            if (ex != null) {
                if (ex instanceof CompletionException && ex.getCause() != null) {
                    callbackRegistry.failure(ex.getCause());
                } else {
                    callbackRegistry.failure(ex);
                }
            } else {
                callbackRegistry.success(result);
            }
        });
    }

    @Override
    public void addCallback(ListenableFutureCallback<? super T> callback) {
        callbackRegistry.addCallback(callback);
    }


    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return wrappedFuture.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return wrappedFuture.isCancelled();
    }

    @Override
    public boolean isDone() {
        return wrappedFuture.isDone();
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return wrappedFuture.get();
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return wrappedFuture.get(timeout, unit);
    }
}

We just wrap the CompletableFuture and again register a callback. The only non-obvious part is the use of ListenableFutureCallbackRegistry which keeps track of registered ListenableFutureCallbacks. We also have to do some exception processing, but that's all.

If you need to do something like this, I have good news. I have wrapped the code to a reusable library, so you do not have to copy and paste the code, you can just use it as described in the library documentation.

How to specify thread-pool for Java 8 parallel streams

Tuesday, March 18th, 2014

Java 8 streams are cool. Parallel streams are even cooler. They allow me simply parallelize operations on large streams of data. For example, if I want to find all prime numbers smaller than one million, I can do this

 range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList());

Just by calling the parallel() method, I will ensure that the stream library will split the stream to smaller chunks which will be processed in parallel. Great. There is only one drawback. It is not possible to specify the thread-pool to be used. All the tasks on parallel streams are executed in a common fork join pool.

And that's problem in larger multi-threaded systems. The common pool is a single point of contention. If someone submits a long-running task to the pool, all other tasks have to wait for it to finish. In the worst case, the task may run for hours and effectively block all other threads that use parallel streams.

Luckily, there is a workaround. We can execute the calculation in a pool like this

ForkJoinPool forkJoinPool = new ForkJoinPool(2);

...

forkJoinPool.submit(() ->
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime)
		.collect(toList())
).get();

It seems ugly, luckily with Java 8 we can create a lambda expression for the Callable, so submitting is not so painful. Using this trick, the tasks generated by the parallel stream stay in the same pool. I was afraid that this behavior may be implementation-specific, that it's just a coincidence. Luckily, if you look at ForkJoinTask.fork() you can see that it has to work this way. Its documentation says “Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool().” And since parallel streams use fork-join framework, they use the fork method and thus all the tasks stay in the same pool.

So we are able to use parallel streams and choose the thread-pool at the same time. But that's not all. The trick solves other two issues you might not be aware of.

The first one is that the submitting thread is used as a worker. In other words, if you execute calculation on a parallel stream, the thread that submitted the calculation is used for processing. So you are basically mixing threads from a pool with a thread that has completely different life-cycle. I can imagine several scenarios where it can cause problems. Some of them are described here. By explicitly using fork join pool, we make sure that the parallel stream is processed only by threads from the thread pool.

The other problem is that the parallel processing might take a long time and I would like to limit the time spent on the task. By explicitly using submit, I can specify a timeout in the get method. It comes handy in real-world systems where we just do not want to hope that everything will go according to plan.

Asynchronous servlet processing

Sunday, April 7th, 2013

Welcome to new episode of fun with threads. Today we will cover the exciting topic of asynchronous servlet processing. In the last episode we have learned that it's not a problem to start more than 10,000 threads in modern JVM, today we will play with asynchronous “threadless” model.

The first important thing thing to note is that we still need a thread everytime our code is doing something. If you have your code in front of you, you always need a thread to move between the lines of the code. But if the code is waiting for something like IO, you do not need a thread and we can leverage that. We have to realize that the vast majority of Java applications act as a middleman – they just toss data from one side to another. From database to HTML, from proprietary back-end to SOAP WS, from REST service to another REST service. You get the picture. I usually do not write chat applications that need to keep a connection open and wait for an event. I usually end-up writing a classical request-response applications that just need to handle lot of requests.

Let's examine how such applications work. What are the characteristics of such applications? First thing to notice is that they are waiting for most of the time. They are waiting for an incoming request. When the request comes, they do some processing, validation and transformation and then call a back-end. And than they wait again, this time for the response. When the response arrives, our application does some transformation and sends the response back. And then it waits again.

Request processing

Of course it's application dependent, but in most cases there are few milliseconds of work followed by tens or hundreds of milliseconds of waiting time. You do not have to block a thread when you are waiting. Even though the threads are pretty cheap, we do not want to waste them.

But as is usually the case, there are some tradeoffs. The traditional threaded model is easier to reason about, the asynchronous model is harder to grasp and easier to mess-up. I am not talking about shared state and similar stuff, I am just talking about the fact that my brain works better when I can read the code in a linear way an not thinking about jumping here and there.

Take the following example of a naive proxy using HTTP client 4

log("Servlet no. {} called.", number);
HttpGet backendRequest = createBackendRequest(req, number);
HttpResponse result = client.execute(backendRequest);
copyResultToResponse(result, resp);
log("Servlet no. {} processed.", number);
log("Servlet no. {} returning.", number);

Full source code

It's evident what's going on. I create a back-end request, then I execute it and then process the result. It's straightforward.

Now let's take a look at the asynchronous version. The easiest way is to delegate the asynchronous processing to libraries. On the top we will use Servlet 3 asynchronous processing, on the back-end side we will use HttpClient asychronous capabilities. Luckily both features play well together, so I can use this code.

log("Servlet no. {} called.", number);
HttpGet backendRequest = createBackendRequest(req, number);
//start async processing
final AsyncContext asyncContext = req.startAsync(req, resp);
client.execute(backendRequest, new FutureCallback<HttpResponse>() {
       @Override
       public void completed(HttpResponse result) {
              ServletResponse response = asyncContext.getResponse();
              copyResultToResponse(result, response);
              log("Servlet no. {} processed.", number);
              asyncContext.complete();
      }
      // error handling removed for brevity
           
});
log("Servlet no. {} returning.", number);

Full source code

Well it's kind of readable. I can imagine that readability could be better in modern languages. What I do not like is that's hard to grasp the code. Try it for yourself, what will be the order of log messages? It can be

a) Servlet no. 1 called. - Servlet no. 1 processed. - Servlet no. 1 returning.
b) Servlet no. 1 called. - Servlet no. 1 returning. - Servlet no. 1 processed.
c) Servlet no. 1 returning. - Servlet no. 1 called. - Servlet no. 1 processed.

What's your answer? It's of course the second one. The first one is for the synchronous version and the third is just made-up. Why the second version? It's easy. By calling req.startAsync(req, resp) I say that I am starting the asynchronous processing. Then I execute back-end call but I put the response processing to a callback. So it's processed after the back-end response arrives. Now I have finished the request execution and the request processing thread provided by the servlet container finishes the execution. Of course it has to unroll the whole stack. So it goes through all your filters as if the processing has already finished.

But it has not. It's still waiting for the back-end response. Once the response arrives, the HttpClient calls the callback using its own thread. It has to, we have already returned the servlet thread to the container.

In the callback we process the response and by calling asyncContext.complete() we say to the servlet container that we have really finished. When we leave the callback, HttpClient will use the same thread to process another back-end response.

So in reality, the log from asynchronous call is

ServletThread - Servlet no. 1 called.
ServletThread - Servlet no. 1 returning.
HttpClientThread - Servlet no. 1 processed.

It's easy to understand it once you get used to it. But you have to be aware of few downsides. The first is, that servlet filters do not work as you are used to. Do you want to do a response postprocessing in a filter? You are out of luck. You can use asyncContext.dispatch() method, but it adds even more complexity. Better solution may be to use framework support like the one in Spring MVC that can help you with the task.

The other downside is that you have to think more about the threads and sizing of thread pools. For example, if we have more time-consuming response processing, we have to add more threads to the HttpClient thread pool.

The third downside is that if you do not have support in the client library as we had in HttpClient, you have to take care about the threading yourself. Not only the library needs to support asynchronous calls, it has to support callbacks. If it uses Futures, you have to figure out how to get the result from the future and process it. Again, you will end up with some threading work.

So, if the asynchronous model is so complicated why would we want to use it? Why do not we just use the old synchronous model? The truth is that the synchronous model make sense for most of the use-cases. It is able to process few thousands of parallel requests and it is usually enough for most sites.

But if you have really massive load and you have to utilize you hardware as much as possible, asynchronous processing performs much better. In my test, I recursively call a servlet which in turns calls itself which calls itself until some limit is reached. Jetty is able to process 20,000 connections in less than 15 seconds (after some warm-up when it creates all the connections needed and then just reuses them). Keep in mind that under this test all connections are kept open. So effectively, in the middle of the test I have 20,000 connections open on servlet side and 20,000 connections open on the HTTP client side. All of this with 30 threads including JVM housekeeping threads. All of this on my five-year-old laptop with less than 1.5GB of heap. Pretty cool.

You can try it for yourself the source code is available here.