Introduction

When trying to understand concurrency the producer and consumer implementation is normally the first concept to know. The producer will place in data to a shared resource only if the consumer has already emptied it. Where the producer and consumer are two different workers.

Using Wait and Notify

This is the classic way of implementing producer and consumer. We call the wait method if we want to block the execution and notify if we want to continue. Since theses methods are part of the Object class, every class in java has them. However, to use them the current thread must acquire the a monitor otherwise an IllegalMonitorStateException will be thrown. One of the way to acquire a monitor is to use the synchronized block. This is what we will be using in the following code.

Code 1 - Wait and Notify Implementation
package xyz.ronella.concurrency;

import java.util.Stack;
import java.util.stream.IntStream;

public class ProducerConsumerWaitNotify {

    public static void main (String ... args) {

        final Stack sharedData = new Stack<>();
        final Object monitor = new Object();

        //The data to process.
        final IntStream dataRange = IntStream.range(1, 10);

        Thread producer = new Thread(() -> {

            try {
                dataRange.forEach(___data -> {
                    synchronized (monitor) {
                        try {
                            while (sharedData.isEmpty()) {
                                sharedData.push(___data);
                                System.out.println("Producing " + ___data);

                                monitor.notify();
                            }

                            monitor.wait();
                        } catch (InterruptedException e) {
                            System.out.println("Producer Interrupted.");
                            throw new RuntimeException();
                        }
                    }
                });
            }
            catch (RuntimeException e) {
                System.out.println("Final producer exit.");
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    synchronized (monitor) {
                        while (!sharedData.isEmpty()) {
                            System.out.println("Consuming " + sharedData.pop());
                            monitor.notify();
                        }

                        monitor.wait();
                    }
                }
            } catch (InterruptedException e) {
                System.out.println("Consumer Interrupted.");
            }
        });

        consumer.start();
        producer.start();

        Delayer.delay(1000);

        producer.interrupt();
        consumer.interrupt();

        System.out.println("Done");
    }

}

Notice the Delayer.delay(100) (see the Delayer code) near the end of the code. It is only there to delay the exit of the main thread.

Using Lock Conditions

An alternative to implementing producer and consumer is using the lock conditions. Some of the advantages of using Lock interface over synchronized is that it is not required to code it in a single block and can also cross scopes (e.g. you can acquire lock in a method and unlock it in different method.).

To use lock conditions must first create an instance of Lock like the following:

Snippet 1 - Instantiates a Lock
final Lock lock = new ReentrantLock();

Once created, we can now create an instance of Condition for both producer and consumer like the following:

Snippet 2 - Create Producer and Consumer Conditions
final Condition producerCond = lock.newCondition(); final Condition consumerCond = lock.newCondition();

We acquire the monitor using the lock method and release it using unlock method of the Lock instance like the following (see the actual usage on code 2):

Snippet 3 - Acquiring and Releasing a Monitor using Lock Interface
try{
	lock.lock();

	.
	.
	.

}
finally{
	lock.unlock();
}

Knowing all of these, we can now combine them to the following code.

Code 2 - Lock Condition Implementation
package xyz.ronella.concurrency;

import java.util.Stack;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

public class ProducerConsumerLockCondition {

    public static void main (String ... args) {

        final Stack sharedData = new Stack<>();
        final Lock lock = new ReentrantLock();
        final Condition producerCond = lock.newCondition();
        final Condition consumerCond = lock.newCondition();

        //The data to process.
        final IntStream dataRange = IntStream.range(1, 10);

        Thread producer = new Thread(() -> {

            try {
                lock.lock();
                dataRange.forEach(___data -> {
                    try {
                        while (sharedData.isEmpty()) {
                            sharedData.push(___data);
                            System.out.println("Producing " + ___data);

                            consumerCond.signal();
                        }

                        producerCond.await();
                    } catch (InterruptedException e) {
                        System.out.println("Producer Interrupted.");
                        throw new RuntimeException();
                    }
                });
            }
            catch (RuntimeException e) {
                System.out.println("Final producer exit.");
            }
            finally {
                lock.unlock();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                lock.lock();
                while (true) {
                    if (!sharedData.isEmpty()) {
                        System.out.println("Consuming " + sharedData.pop());
                        producerCond.signal();
                    }

                    consumerCond.await();
                }
            } catch (InterruptedException e) {
                System.out.println("Consumer Interrupted.");
            }
            finally {
                lock.unlock();
            }
        });

        consumer.start();
        producer.start();

        Delayer.delay(1000);

        producer.interrupt();
        consumer.interrupt();

        System.out.println("Done");
    }
}

Using ArrayBlockingQueue

Another implementation is to just the use an instance of ArrayBlockingQueue and use put and take methods.  The put method blocks until a space (i.e. based on the capacity) is available and the take method blocks until there are some data to take. Thus, we don't need any explicit locking for this.

We define the capacity of an ArrayBlockingQueue on its constructor as we can see as part of the following code.

Code 3 - ArrayBlockingQueue Implementation
package xyz.ronella.concurrency;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.IntStream;

public class ProducerConsumerArrayBlockingQueue {

    public static void main (String ... args) {

        final long delay = 10;
        final BlockingQueue sharedData = new ArrayBlockingQueue<>(1);

        //The data to process.
        final IntStream dataRange = IntStream.range(1, 10);

        Thread producer = new Thread(() -> {

            try {
                dataRange.forEach(___data -> {
                    try {
                        sharedData.put(___data);
                        System.out.println("Producing " + ___data);
                    } catch (InterruptedException e) {
                        System.out.println("Producer Interrupted.");
                        throw new RuntimeException();
                    }
                });
            }
            catch (RuntimeException e) {
                System.out.println("Final producer exit.");
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    Thread.sleep(delay);
                    //This will block if the sharedData is empty.
                    int data = sharedData.take();
                    System.out.println("Consuming " + data);
                }
            } catch (InterruptedException e) {
                System.out.println("Consumer Interrupted.");
            }
        });

        consumer.start();
        producer.start();

        Delayer.delay(1000);

        producer.interrupt();
        consumer.interrupt();

        System.out.println("Done");
    }
}

The Delayer Code

package xyz.ronella.concurrency;

public class Delayer {

    private Delayer() {}

    public static void delay(long millis) {
        final Object mainMonitor = new Object();
        Thread mainTimer = new Thread(() -> {
            try {
                Thread.sleep(millis);
                synchronized (mainMonitor) {
                    mainMonitor.notify();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        mainTimer.start();

        synchronized (mainMonitor) {
            try {
                mainMonitor.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}