Java 8 streams are cool. Parallel streams are even cooler. They allow me simply parallelize operations on large streams of data. For example, if I want to find all prime numbers smaller than one million, I can do this
range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList());
Just by calling the parallel() method, I will ensure that the stream library will split the stream to smaller chunks which will be processed in parallel. Great. There is only one drawback. It is not possible to specify the thread-pool to be used. All the tasks on parallel streams are executed in a common fork join pool.
And that’s problem in larger multi-threaded systems. The common pool is a single point of contention. If someone submits a long-running task to the pool, all other tasks have to wait for it to finish. In the worst case, the task may run for hours and effectively block all other threads that use parallel streams.
Luckily, there is a workaround. We can execute the calculation in a pool like this
ForkJoinPool forkJoinPool = new ForkJoinPool(2); ... forkJoinPool.submit(() -> range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime) .collect(toList()) ).get();
It seems ugly, luckily with Java 8 we can create a lambda expression for the Callable, so submitting is not so painful. Using this trick, the tasks generated by the parallel stream stay in the same pool. I was afraid that this behavior may be implementation-specific, that it’s just a coincidence. Luckily, if you look at ForkJoinTask.fork() you can see that it has to work this way. Its documentation says “Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool().” And since parallel streams use fork-join framework, they use the fork method and thus all the tasks stay in the same pool.
So we are able to use parallel streams and choose the thread-pool at the same time. But that’s not all. The trick solves other two issues you might not be aware of.
The first one is that the submitting thread is used as a worker. In other words, if you execute calculation on a parallel stream, the thread that submitted the calculation is used for processing. So you are basically mixing threads from a pool with a thread that has completely different life-cycle. I can imagine several scenarios where it can cause problems. Some of them are described here. By explicitly using fork join pool, we make sure that the parallel stream is processed only by threads from the thread pool.
The other problem is that the parallel processing might take a long time and I would like to limit the time spent on the task. By explicitly using submit, I can specify a timeout in the get method. It comes handy in real-world systems where we just do not want to hope that everything will go according to plan.
Uh, how exactly does PrimesPrint::isPrime work?
This seems like quite inappropriate usage of parallel computation.
Your program would perform much faster when stateful and singlethreaded.
Sure, it’s just an example.
do you have a full example of this? I am wondering how this would work for streams that do not produce a result.
thanks
You can just submit Runnable, but I can not imagine use-case for streams that do not produce a result. Some examples are here
http://leakfromjavaheap.blogspot.com/2014/06/a-gentle-touch-of-functional.html
Pingback: Control Number of Threads in Java lambda in a Portable manner | Solutions for enthusiast and professional programmers
Pingback: Java parallelStream() with custom pool with caller work stealing? - TecHub