How to specify thread-pool for Java 8 parallel streams

March 18th, 2014

Java 8 streams are cool. Parallel streams are even cooler. They allow me simply parallelize operations on large streams of data. For example, if I want to find all prime numbers smaller than one million, I can do this

 range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList());

Just by calling the parallel() method, I will ensure that the stream library will split the stream to smaller chunks which will be processed in parallel. Great. There is only one drawback. It is not possible to specify the thread-pool to be used. All the tasks on parallel streams are executed in a common fork join pool.

And that's problem in larger multi-threaded systems. The common pool is a single point of contention. If someone submits a long-running task to the pool, all other tasks have to wait for it to finish. In the worst case, the task may run for hours and effectively block all other threads that use parallel streams.

Luckily, there is a workaround. We can execute the calculation in a pool like this

ForkJoinPool forkJoinPool = new ForkJoinPool(2);

...

forkJoinPool.submit(() ->
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime)
		.collect(toList())
).get();

It seems ugly, luckily with Java 8 we can create a lambda expression for the Callable, so submitting is not so painful. Using this trick, the tasks generated by the parallel stream stay in the same pool. I was afraid that this behavior may be implementation-specific, that it's just a coincidence. Luckily, if you look at ForkJoinTask.fork() you can see that it has to work this way. Its documentation says “Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool().” And since parallel streams use fork-join framework, they use the fork method and thus all the tasks stay in the same pool.

So we are able to use parallel streams and choose the thread-pool at the same time. But that's not all. The trick solves other two issues you might not be aware of.

The first one is that the submitting thread is used as a worker. In other words, if you execute calculation on a parallel stream, the thread that submitted the calculation is used for processing. So you are basically mixing threads from a pool with a thread that has completely different life-cycle. I can imagine several scenarios where it can cause problems. Some of them are described here. By explicitly using fork join pool, we make sure that the parallel stream is processed only by threads from the thread pool.

The other problem is that the parallel processing might take a long time and I would like to limit the time spent on the task. By explicitly using submit, I can specify a timeout in the get method. It comes handy in real-world systems where we just do not want to hope that everything will go according to plan.

CompletableFutures – why to use async methods?

December 25th, 2013

I have been playing with Java 8 CompletableFutures and one thing has been bothering me – shall I use methods with Async suffix or the variant without it. Let's take a look at the following example:

CompletableFuture.supplyAsync(() -> getUser(userId))
                .thenApply(CreditRatingService::getCreditRatingSystem1)
                .thenAccept(System.out::println);

private static User getUser(int id) {...}

private static CreditRating getCreditRatingSystem1(User user) {...}

Here I want to get some data about a user from a remote system, then call another system to get his credit rating and once it is finished, I want to print the result. All of the tasks may take some time, so I do not want to block the main thread, hence the use of completable futures.

If you are not yet familiar with Java 8 syntax, () -> getUser(userId) is a lambda calling getUser method. CreditRatingService::getCreditRatingSystem1 and System.out::println are method references. Underneath, the supplyAsync method creates a task which gets the user details. The task is submitted to fork-join thread pool and when it finishes, the result is passed to the next task. When the next task finishes, its result is sent further and so on. It's quite neat and simple.

The question is, whether I should use the previous variant or this one.

CompletableFuture.supplyAsync(() -> getUser(userId))
                .thenApplyAsync(CreditRatingService::getCreditRatingSystem1)
                .thenAcceptAsync(System.out::println);

The difference is in the async suffix on the method names. The methods without async execute their task in the same thread as the previous task. So in the first example, all getUser, getCreditRating and println are executed in the same thread. It's OK, it's still a thread from the fork-join pool, so the main thread is not blocked.

The second variant always submits the succeeding task to the pool, so each of the tasks can be handled by different thread. The result will be the same, the first variant is a bit more effective due to less thread switching overhead. It does not make any sense to use the async variant here, so what is it good for? It was not clear to me, until extended the example to download the credit rating data from two different systems.

CompletableFuture<User> user = CompletableFuture.supplyAsync(() -> getUser(userId));

CompletableFuture<CreditRating> rating1 = 
    user.thenApplyAsync(CreditRatingService::getCreditRatingSystem1);
CompletableFuture<CreditRating> rating2 = 
    user.thenApplyAsync(CreditRatingService::getCreditRatingSystem2);

rating1
    .thenCombineAsync(rating2, CreditRating::combine)
    .thenAccept(System.out::println);

Here I have two actions waiting for user details. Once the details are available, I want to get two credit ratings from two different systems. Since I want to do it in parallel, I have to use at least one async method. Without async, the code would use only one thread so both credit rating tasks would be executed serially.

I have added combine phase that waits for both credit rating tasks to complete. It's better to make this async too, but from different reason. Without async, the same thread as in rating1 would be used. But you do not want to block the thread while waiting for the rating2 task. You want to return it to the pool and get a thread only when it is needed.

When both tasks are ready, we can combine the result and print it in the same thread, so the last thenAccept is without async.

As we can see, both method variants are useful. Use async, when you need to execute more tasks in parallel or you do not want to block a thread while waiting for a task to finish. But since it is not easy to reason about such programs, I would recommend to use async everywhere. It might be little bit less effective, but the difference is negligible. For very small small price, it gives you the freedom to delegate your thinking to the machine. Just make everything async and let the fork-join framework care about the optimization.

Poslouchejme

December 17th, 2013

Nemůžete posunout konverzaci pozitivnějším směrem, pokud se váš partner necítí vyslyšen a pochopen.
Douglas Stone

Musím se k něčemu přiznat. Myslel jsem si, že hlavním cílem komunikace je přenos myšlenek z jedné hlavy do druhé. Že když mi někdo něco vysvětluje, tak je jeho hlavním cílem předat mi nějakou myšlenku a mým úkolem je snažit se mu v tom pomoci. V okamžiku, kdy mi tu myšlenku předal, tak mu to musím dát najevo, ujistit se, že jsem to pochopil správně, prohlásit téma za ukončené a přesunout se dál.

Zní to rozumně, plně to odpovídá modelu racionálního člověka a přesto je to naprosto špatně. Ve spoustě situací vám druhý člověk nechce předat informaci, ale chce se jednoduše vypovídat.

Proč to vůbec píšu? Protože je to důležité a hrozně s tím bojuji. Nedávno se mi stalo, že jsem zastavil kolegu po první větě, protože jsem věděl, že to co mi říká, už dávno neplatí. Chyba. Myslel jsem si, že nám tím ušetřím čas, ale nestalo se. Byl rozhozený a následná diskuze byla k ničemu.

Co s tím? Je potřeba poslouchat. Ano, i když vám někdo říká něco co už dávno víte, i tak musíte poslouchat. Ne jenom čekat až ten druhý domluví, inteligentně u toho pokyvovat hlavou a přitom přemýšlet co budu říkat já, až se uráčí vyžbleptnout. Je potřeba ho opravdu naplno poslouchat. Nejen že nám opravdu může říci něco co nevíme. Ale i kdybychom přesně věděli, co nám chce sdělit, tak dokud nebudeme poslouchat my jeho, tak nebude poslouchat on nás. Dostaneme se do situace, kdy budeme vést sériový monolog. Každý si bude mlít to svoje, aniž bychom se navzájem poslouchali.

Abych ocitoval odbornou literaturu: „Důvod proč vás ten druhý neposlouchá není to, že je tvrdohlavý, je to proto, že se necítí být vyslyšen. Jinými slovy, neposlouchá vás ze stejného důvodu proč neposloucháte vy jeho. Myslí si o vás, že jste pomalí a tvrdohlaví. Takže se opakuje, hledá nové cesty jak vám to vysvětlit, zvyšuje hlas a tak dále.“ Nepřipomíná vám to vaší poslední složitou schůzi? Mně jo.

Netvrdím, že poslouchání je jednoduché. Už pár měsíců se to snažím naučit a pořád se mi to nedaří. Ano, umím se naučit parafrázovat, klást doplňující otázky a podobné finty. V tom to ale není. Ten pravý trik je v tom opravdu vnímat co ten druhý říká. Je to hrozná fuška, ale začíná mi docházet, že jinak to nejde.