Extremely Serious

Category: Concurrency

ThreadPoolExecutor with ArrayBlockingQueue and Custom Thread Name

Create an implementation of ThreadFactory to create a thread with custom name for ThreadPoolExecutor as follows:

class MyThreadFactory implements ThreadFactory {
    private AtomicLong threadCounter;

    private MyThreadFactory() {
        threadCounter = new AtomicLong();
    }

    @Override
    public Thread newThread(Runnable runnable) {
        var thread=new Thread(runnable);
        thread.setName(String.join("-","my-thread",
                String.valueOf(threadCounter.incrementAndGet())));
        return thread;
    }
}

The preceding class will generate a thread with the name starting with my-thread. Use the instance of this class in constructing the ThreadPoolExecutor as follows:

var executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(100), new MyThreadFactory(), 
        new ThreadPoolExecutor.CallerRunsPolicy());

The preceding declaration creates an instance of ThreadPoolExecutor with 2 core threads, 4 maximum threads, 60 seconds keep alive and supports 100 items in the queue. The queue size is defined by the instance of ArrayBlockingQueue class.

Start all the core threads as follows:

executor.prestartAllCoreThreads();

Using a profiling tool we can search for all the threads whose names starts with my-thread like the following screenshot:

Don't forget to call any shutdown/terminate methods when done using the executor.

Using RxJava PublishSubject Concurrently

Introduction

Implementing hot observable can be achieved with RxJava's PublishSubject. Moreover, we will try to publish data to the subject concurrently safely (i.e. using the synchronized keyword). Thus the complete code at the bottom uses several threads for publishing data and several threads for subscription.

Creating an Instance of PublishSubject

A simple way to create an instance of the PublishSubject instance is by invoking the static method create like the following snippet (see the complete code at the bottom).

Snippet 1 - Invoking the Static Method Create
PublishSubject subject = PublishSubject.create();

Sending Data to PublishSubject

Sending data to a PublishSubject instance can be done by calling its onNext method. Place the actual call in a synchronized block if we are sending data concurrently like the following snippet (see the complete code at the bottom):

Snippet 2 - Sending Data to PublishSubject Instance
synchronized (subject) {
    subject.onNext(String.valueOf(strItem));
}

Subscribing to PublishSubject

To listen to any of the data sent to a PublishSubject instance we can use one of the subscribe method. In addition, we can also opt to use the computation Scheduler (i.e. normally the Scheduler is the one responsible to managing threads in a multi-threaded environment.) via the observeOn method (i.e. for some reason the subscribeOn method is not working with PublishSubject instance.) like the following  snippet (see the complete code at the bottom):

Snippet 3 - Subscribing to PublishSubject Instance
subject.observeOn(Schedulers.computation())
        .subscribe(___item -> {
    System.out.println(_name + " ThreadID:" + Thread.currentThread().getId() + ": " + ___item);
});

The Complete Code

package xyz.ronella.reactive;

import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.IntStream;

public class PublishedObservable {

    public static void main(String ... args) {
        final PublishSubject subject = PublishSubject.create();

        List publishers = new ArrayList<>();
        List subscribers = new ArrayList<>();

        int defaultThreadCount = 3;

        ExecutorService executorPublisher = Executors.newFixedThreadPool(defaultThreadCount);
        ExecutorService executorSubscriber = Executors.newFixedThreadPool(defaultThreadCount);

        class LocalPublisher implements Runnable {

            private long _item;
            private String _name;
            private PublishSubject _subject;

            public LocalPublisher(String name, long itemStart, PublishSubject subject) {
                this._name = name;
                this._item = itemStart;
                this._subject = subject;
            }

            @Override
            public void run() {
                try {
                    while(true) {
                        Thread.sleep(1000);
                        String strItem = String.valueOf(++_item);
                        System.out.println("\n" + _name + " ThreadID:" + Thread.currentThread().getId() + ": " + strItem);
                        synchronized (_subject) {
                            _subject.onNext(String.valueOf(strItem));
                        }
                    }
                } catch (InterruptedException e) {
                    System.out.println(_name + " interrupted.");
                }
            }
        }

        class LocalSubscriber implements Runnable {

            private String _name;
            private PublishSubject _subject;

            public LocalSubscriber(String name, PublishSubject subject) {
                this._name = name;
                this._subject = subject;
            }

            @Override
            public void run() {
                _subject.observeOn(Schedulers.computation())
                        .subscribe(___item -> {
                    System.out.println(_name + " ThreadID:" + Thread.currentThread().getId() + ": " + ___item);
                });
            }
        }

        IntStream.rangeClosed(1, 3).forEach(___idx ->
                subscribers.add(executorSubscriber.submit(new LocalSubscriber("Subscriber " + ___idx, subject))));

        IntStream.rangeClosed(1, 6).forEach(___idx ->
                publishers.add(executorPublisher.submit(new LocalPublisher("Publisher " + ___idx,___idx * 100, subject))));

        subject.subscribe( ___item -> System.out.println("Main - ThreadID:" + Thread.currentThread().getId() + " " + ___item));

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executorPublisher.shutdownNow();
        executorSubscriber.shutdown();
    }
}

Producer and Consumer Implementations

Introduction

When trying to understand concurrency the producer and consumer implementation is normally the first concept to know. The producer will place in data to a shared resource only if the consumer has already emptied it. Where the producer and consumer are two different workers.

Using Wait and Notify

This is the classic way of implementing producer and consumer. We call the wait method if we want to block the execution and notify if we want to continue. Since theses methods are part of the Object class, every class in java has them. However, to use them the current thread must acquire the a monitor otherwise an IllegalMonitorStateException will be thrown. One of the way to acquire a monitor is to use the synchronized block. This is what we will be using in the following code.

Code 1 - Wait and Notify Implementation
package xyz.ronella.concurrency;

import java.util.Stack;
import java.util.stream.IntStream;

public class ProducerConsumerWaitNotify {

    public static void main (String ... args) {

        final Stack sharedData = new Stack<>();
        final Object monitor = new Object();

        //The data to process.
        final IntStream dataRange = IntStream.range(1, 10);

        Thread producer = new Thread(() -> {

            try {
                dataRange.forEach(___data -> {
                    synchronized (monitor) {
                        try {
                            while (sharedData.isEmpty()) {
                                sharedData.push(___data);
                                System.out.println("Producing " + ___data);

                                monitor.notify();
                            }

                            monitor.wait();
                        } catch (InterruptedException e) {
                            System.out.println("Producer Interrupted.");
                            throw new RuntimeException();
                        }
                    }
                });
            }
            catch (RuntimeException e) {
                System.out.println("Final producer exit.");
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    synchronized (monitor) {
                        while (!sharedData.isEmpty()) {
                            System.out.println("Consuming " + sharedData.pop());
                            monitor.notify();
                        }

                        monitor.wait();
                    }
                }
            } catch (InterruptedException e) {
                System.out.println("Consumer Interrupted.");
            }
        });

        consumer.start();
        producer.start();

        Delayer.delay(1000);

        producer.interrupt();
        consumer.interrupt();

        System.out.println("Done");
    }

}

Notice the Delayer.delay(100) (see the Delayer code) near the end of the code. It is only there to delay the exit of the main thread.

Using Lock Conditions

An alternative to implementing producer and consumer is using the lock conditions. Some of the advantages of using Lock interface over synchronized is that it is not required to code it in a single block and can also cross scopes (e.g. you can acquire lock in a method and unlock it in different method.).

To use lock conditions must first create an instance of Lock like the following:

Snippet 1 - Instantiates a Lock
final Lock lock = new ReentrantLock();

Once created, we can now create an instance of Condition for both producer and consumer like the following:

Snippet 2 - Create Producer and Consumer Conditions
final Condition producerCond = lock.newCondition(); final Condition consumerCond = lock.newCondition();

We acquire the monitor using the lock method and release it using unlock method of the Lock instance like the following (see the actual usage on code 2):

Snippet 3 - Acquiring and Releasing a Monitor using Lock Interface
try{
	lock.lock();

	.
	.
	.

}
finally{
	lock.unlock();
}

Knowing all of these, we can now combine them to the following code.

Code 2 - Lock Condition Implementation
package xyz.ronella.concurrency;

import java.util.Stack;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

public class ProducerConsumerLockCondition {

    public static void main (String ... args) {

        final Stack sharedData = new Stack<>();
        final Lock lock = new ReentrantLock();
        final Condition producerCond = lock.newCondition();
        final Condition consumerCond = lock.newCondition();

        //The data to process.
        final IntStream dataRange = IntStream.range(1, 10);

        Thread producer = new Thread(() -> {

            try {
                lock.lock();
                dataRange.forEach(___data -> {
                    try {
                        while (sharedData.isEmpty()) {
                            sharedData.push(___data);
                            System.out.println("Producing " + ___data);

                            consumerCond.signal();
                        }

                        producerCond.await();
                    } catch (InterruptedException e) {
                        System.out.println("Producer Interrupted.");
                        throw new RuntimeException();
                    }
                });
            }
            catch (RuntimeException e) {
                System.out.println("Final producer exit.");
            }
            finally {
                lock.unlock();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                lock.lock();
                while (true) {
                    if (!sharedData.isEmpty()) {
                        System.out.println("Consuming " + sharedData.pop());
                        producerCond.signal();
                    }

                    consumerCond.await();
                }
            } catch (InterruptedException e) {
                System.out.println("Consumer Interrupted.");
            }
            finally {
                lock.unlock();
            }
        });

        consumer.start();
        producer.start();

        Delayer.delay(1000);

        producer.interrupt();
        consumer.interrupt();

        System.out.println("Done");
    }
}

Using ArrayBlockingQueue

Another implementation is to just the use an instance of ArrayBlockingQueue and use put and take methods.  The put method blocks until a space (i.e. based on the capacity) is available and the take method blocks until there are some data to take. Thus, we don't need any explicit locking for this.

We define the capacity of an ArrayBlockingQueue on its constructor as we can see as part of the following code.

Code 3 - ArrayBlockingQueue Implementation
package xyz.ronella.concurrency;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.IntStream;

public class ProducerConsumerArrayBlockingQueue {

    public static void main (String ... args) {

        final long delay = 10;
        final BlockingQueue sharedData = new ArrayBlockingQueue<>(1);

        //The data to process.
        final IntStream dataRange = IntStream.range(1, 10);

        Thread producer = new Thread(() -> {

            try {
                dataRange.forEach(___data -> {
                    try {
                        sharedData.put(___data);
                        System.out.println("Producing " + ___data);
                    } catch (InterruptedException e) {
                        System.out.println("Producer Interrupted.");
                        throw new RuntimeException();
                    }
                });
            }
            catch (RuntimeException e) {
                System.out.println("Final producer exit.");
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    Thread.sleep(delay);
                    //This will block if the sharedData is empty.
                    int data = sharedData.take();
                    System.out.println("Consuming " + data);
                }
            } catch (InterruptedException e) {
                System.out.println("Consumer Interrupted.");
            }
        });

        consumer.start();
        producer.start();

        Delayer.delay(1000);

        producer.interrupt();
        consumer.interrupt();

        System.out.println("Done");
    }
}

The Delayer Code

package xyz.ronella.concurrency;

public class Delayer {

    private Delayer() {}

    public static void delay(long millis) {
        final Object mainMonitor = new Object();
        Thread mainTimer = new Thread(() -> {
            try {
                Thread.sleep(millis);
                synchronized (mainMonitor) {
                    mainMonitor.notify();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        mainTimer.start();

        synchronized (mainMonitor) {
            try {
                mainMonitor.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

ConcurrentHashMap Parallel Operations

Introduction

In Java 8, the forEach, search and reduce methods of the ConcurrentHashMap have parallel operation capability. Normally, it is indicated by the parameter called parallelismThreshold. This parameter specifies the number of elements before the execution becomes parallel. For example, if the parallelismThreshold is 10 then if the number of elements to process is below 10 then it will be processed in the current thread (i.e. single thread) otherwise it will be processed in parallel.

Using the forEach method

The forEach method has different overloads but we will just be focusing on the simplest one. The one that accepts parallelismThreshold and BiConsumer parameters. To demonstrate parallel execution in action lets start with the ConcurrentHashMap that has items less than the parallelismThreshold like the following snippet (see the complete code at the bottom):

Snippet 1 - Elements less than parallelismThreshold
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

//This will generate a stream with 1 to 9 as integer content.
IntStream.range(1, 10).forEach(item -> map.put(String.valueOf(item), item));

long parallelismThreshold = 10;

BiConsumer<String, Integer> biConsumer = (key, value) -> {
    System.out.println("key: " + key + " value: " + value + " thread: " + Thread.currentThread().getName());
};

map.forEach(parallelismThreshold, biConsumer);

Running the above snippet will not run in parallel execution since the items in the map is less than the parallelismThreshold. But updating the variable parallelismThreshold variable to 9 and run it again. We must see an output like the following output:

Output 1 - Updating the parallelismThreshold to 9 in snippet 1
key: 1 value: 1 thread: main
key: 8 value: 8 thread: ForkJoinPool.commonPool-worker-1
key: 2 value: 2 thread: main
key: 9 value: 9 thread: ForkJoinPool.commonPool-worker-1
key: 3 value: 3 thread: main
key: 4 value: 4 thread: main
key: 5 value: 5 thread: main
key: 6 value: 6 thread: main
key: 7 value: 7 thread: main

Notice the ForkJoinPool.commonPool-worker thread which indicates that the forEach method is operating on parallel.

Using the search method

The search method is the functionality to find an item in the map with parallel support. This methods has siblings namely: searchEntries, searchKeys and searchValues but we will just be focusing on the search method with parallelismThreshold and BiFunction parameters. For the sample usage, see the following snippet (see the complete code at the bottom).

Snippet 2 - Sample usage of search method
BiFunction<String, Integer, Integer> biFunction = (key, value) -> {

    if (key.equals("9")) {
        return value; //Stop searching.
    }
    return null; //Continue searching.
};

System.out.println("Value: " + map.search(parallelismThreshold, biFunction));

We can run the previous snippet after concatenating it with snippet 1. The search will stop when the BiFunction implementation return non-null value. Otherwise, the search will continue. This is good if you have a big map.

Using the reduce method

The reduce method is the one to use if we need to accumulate something from the map items to produce a single result. This also supports parallel operation and has many siblings (e.g. reduceEntries, reduceKeys, reduceValues, etc ...). But, we will only focus on the reduce method itself that accepts parallelismThreshold, transformer (i.e. BiFunction) and reducer (i.e. BiFunction).

The transformer parameter is the one that prepares what the reducer will work on. Like for example if the wanted to work on the values of the map from snippet 1. The transformer would be something like the following:

Snippet 3 - Transformer logic
BiFunction<String, Integer, Integer> transformer = (key, value) -> value;

Knowing what the transformer will give us. We can write a reducer logic what will add all the values returned by the transformer logic like the following:

Snippet 4 - Reducer logic
BiFunction<Integer, Integer, Integer> reducer = (aggr, value) -> aggr + value;

From the snippet above, the aggr parameter is a variable the accumulates the value argument.

Having the transformer and the reducer ready we can invoke the reduce method like the following (see the complete code at the bottom):

Snippet 5 - Invoking the reduce method
System.out.println("Total: " + map.reduce(parallelismThreshold, transformer, reducer));

The Complete Code

package xyz.ronella.concurrency;

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.IntStream;

public class HashMap {
    public static void main(String ... args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        //This will generate a stream with 1 to 9 as integer content.
        IntStream.range(1, 10).forEach(item -> map.put(String.valueOf(item), item));

        long parallelismThreshold = 9;

        BiConsumer<String, Integer> biConsumer = (key, value) -> {
            System.out.println("key: " + key + " value: " + value + " thread: " + Thread.currentThread().getName());
        };

        map.forEach(parallelismThreshold, biConsumer);

        BiFunction<String, Integer, Integer> biFunction = (key, value) -> {
            if (key.equals("9")) {
                return value; //Stop searching.
            }
            return null; //Continue searching.
        };

        System.out.println("Value: " + map.search(parallelismThreshold, biFunction));

        BiFunction<String, Integer, Integer> transformer = (key, value) -> value;

        BiFunction<Integer, Integer, Integer> reducer = (aggr, value) -> aggr + value;

        System.out.println("Total: " + map.reduce(parallelismThreshold, transformer, reducer));
    }
}

Using CyclicBarrier with Java

CyclicBarrier was designed to allow threads to wait for each other to a certain barrier and doing it again and again (i.e. the reason why it is called cyclic).

Creating a CyclicBarrier aware Task
  1. Create a task that will accepts and instance of CyclicBarrier like the following snippet (i.e. the complete code will be at the bottom).
    public Teller(CyclicBarrier barrier, String message) {
        _barrier = barrier;
        _message = message;
    }
  2. Within the task we must call the await method of the instance of the CyclicBarrier like the following snippet (i.e. the complete code will be at the bottom). The number of call to this method is the one being tracked by the instance of the CyclicBarrier to compare to the number of parties (i.e. normally the first or only argument of the constructor.) specified during it's initialization.
    @Override
    public String call() throws Exception {
        System.out.println("Processing: " + _message);
        _barrier.await();
        return _message;
    }
Using the CyclicBarrier aware Task
  1. Create an instance of CyclicBarrier class with the expected number of parties (i.e. normally this is the first or only argument of the constructor) before it unblocks itself for all the threads waiting then back to blocking. Optionally, we can also pass a second argument of type Runnable that will only be invoked when the barrier unblocks like the following snippet (i.e. the complete code will be at the bottom).
    CyclicBarrier barrier = new CyclicBarrier(2 /*The number of parties. */
    , () -> System.out.println("Barrier open.") /*The logic to call when the barrier unblocks. */
    );
  2. Submit a number of tasks that corresponds to the multiple of the number of parties (e.g. from step 1 it could be multiple of 2) from step 1 like the following snippet (i.e. the complete code will be at the bottom):
    futures.add(executor.submit(new Teller(barrier, "One")));
    futures.add(executor.submit(new Teller(barrier, "Two")));
    futures.add(executor.submit(new Teller(barrier, "Three")));
    futures.add(executor.submit(new Teller(barrier, "Four")));
    
    futures.forEach((Future future) -> {
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    });
Observation

Upon running the complete code:

  • We must notice that the following message will be executed every multiple of 2 (i.e. observe the processing message):
    Barrier open.
  • The following message (i.e. can be in any order) will never be displayed before One and Two (i.e. can be in any order):
    Three
    Four
The Complete Code
package xyz.ronella.concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Barrier {

    public static void main(String[] args) {

        class Teller implements Callable {

            private CyclicBarrier _barrier;
            private String _message;

            public Teller(CyclicBarrier barrier, String message) {
                _barrier = barrier;
                _message = message;
            }

            @Override
            public String call() throws Exception {
                System.out.println("Processing: " + _message);
                _barrier.await();
                return _message;
            }
        }

        ExecutorService executor = Executors.newFixedThreadPool(2);

        CyclicBarrier barrier = new CyclicBarrier(2 /*The number of parties. */
                , () -> System.out.println("Barrier open.") /*The logic to call when the barrier unblocks. */
        );

        List<Future> futures = new ArrayList<>();

        try {
            futures.add(executor.submit(new Teller(barrier, "One")));
            futures.add(executor.submit(new Teller(barrier, "Two")));
            futures.add(executor.submit(new Teller(barrier, "Three")));
            futures.add(executor.submit(new Teller(barrier, "Four")));

            futures.forEach((Future future) -> {
                try {
                    System.out.println(future.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
        }
        finally {
            executor.shutdown();
        }
    }
}