Introduction

In Java 8, the forEach, search and reduce methods of the ConcurrentHashMap have parallel operation capability. Normally, it is indicated by the parameter called parallelismThreshold. This parameter specifies the number of elements before the execution becomes parallel. For example, if the parallelismThreshold is 10 then if the number of elements to process is below 10 then it will be processed in the current thread (i.e. single thread) otherwise it will be processed in parallel.

Using the forEach method

The forEach method has different overloads but we will just be focusing on the simplest one. The one that accepts parallelismThreshold and BiConsumer parameters. To demonstrate parallel execution in action lets start with the ConcurrentHashMap that has items less than the parallelismThreshold like the following snippet (see the complete code at the bottom):

Snippet 1 - Elements less than parallelismThreshold
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

//This will generate a stream with 1 to 9 as integer content.
IntStream.range(1, 10).forEach(item -> map.put(String.valueOf(item), item));

long parallelismThreshold = 10;

BiConsumer<String, Integer> biConsumer = (key, value) -> {
    System.out.println("key: " + key + " value: " + value + " thread: " + Thread.currentThread().getName());
};

map.forEach(parallelismThreshold, biConsumer);

Running the above snippet will not run in parallel execution since the items in the map is less than the parallelismThreshold. But updating the variableĀ parallelismThreshold variable to 9 and run it again. We must see an output like the following output:

Output 1 - Updating the parallelismThreshold to 9 in snippet 1
key: 1 value: 1 thread: main
key: 8 value: 8 thread: ForkJoinPool.commonPool-worker-1
key: 2 value: 2 thread: main
key: 9 value: 9 thread: ForkJoinPool.commonPool-worker-1
key: 3 value: 3 thread: main
key: 4 value: 4 thread: main
key: 5 value: 5 thread: main
key: 6 value: 6 thread: main
key: 7 value: 7 thread: main

Notice the ForkJoinPool.commonPool-worker thread which indicates that the forEach method is operating on parallel.

Using the search method

The search method is the functionality to find an item in the map with parallel support. This methods has siblings namely: searchEntries, searchKeys and searchValues but we will just be focusing on the search method with parallelismThreshold and BiFunction parameters. For the sample usage, see the following snippetĀ (see the complete code at the bottom).

Snippet 2 - Sample usage of search method
BiFunction<String, Integer, Integer> biFunction = (key, value) -> {

    if (key.equals("9")) {
        return value; //Stop searching.
    }
    return null; //Continue searching.
};

System.out.println("Value: " + map.search(parallelismThreshold, biFunction));

We can run the previous snippet after concatenating it with snippet 1. The search will stop when the BiFunction implementation return non-null value. Otherwise, the search will continue. This is good if you have a big map.

Using the reduce method

The reduce method is the one to use if we need to accumulate something from the map items to produce a single result. This also supports parallel operation and has many siblings (e.g. reduceEntries, reduceKeys, reduceValues, etc ...). But, we will only focus on the reduce method itself that accepts parallelismThreshold, transformer (i.e. BiFunction) and reducer (i.e. BiFunction).

The transformer parameter is the one that prepares what the reducer will work on. Like for example if the wanted to work on the values of the map from snippet 1. The transformer would be something like the following:

Snippet 3 - Transformer logic
BiFunction<String, Integer, Integer> transformer = (key, value) -> value;

Knowing what the transformer will give us. We can write a reducer logic what will add all the values returned by the transformer logic like the following:

Snippet 4 - Reducer logic
BiFunction<Integer, Integer, Integer> reducer = (aggr, value) -> aggr + value;

From the snippet above, the aggr parameter is a variable the accumulates the value argument.

Having the transformer and the reducer ready we can invoke the reduce method like the following (see the complete code at the bottom):

Snippet 5 - Invoking the reduce method
System.out.println("Total: " + map.reduce(parallelismThreshold, transformer, reducer));

The Complete Code

package xyz.ronella.concurrency;

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.IntStream;

public class HashMap {
    public static void main(String ... args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        //This will generate a stream with 1 to 9 as integer content.
        IntStream.range(1, 10).forEach(item -> map.put(String.valueOf(item), item));

        long parallelismThreshold = 9;

        BiConsumer<String, Integer> biConsumer = (key, value) -> {
            System.out.println("key: " + key + " value: " + value + " thread: " + Thread.currentThread().getName());
        };

        map.forEach(parallelismThreshold, biConsumer);

        BiFunction<String, Integer, Integer> biFunction = (key, value) -> {
            if (key.equals("9")) {
                return value; //Stop searching.
            }
            return null; //Continue searching.
        };

        System.out.println("Value: " + map.search(parallelismThreshold, biFunction));

        BiFunction<String, Integer, Integer> transformer = (key, value) -> value;

        BiFunction<Integer, Integer, Integer> reducer = (aggr, value) -> aggr + value;

        System.out.println("Total: " + map.reduce(parallelismThreshold, transformer, reducer));
    }
}