CyclicBarrier was designed to allow threads to wait for each other to a certain barrier and doing it again and again (i.e. the reason why it is called cyclic).

Creating a CyclicBarrier aware Task
  1. Create a task that will accepts and instance of CyclicBarrier like the following snippet (i.e. the complete code will be at the bottom).
    public Teller(CyclicBarrier barrier, String message) {
        _barrier = barrier;
        _message = message;
    }
  2. Within the task we must call the await method of the instance of the CyclicBarrier like the following snippet (i.e. the complete code will be at the bottom). The number of call to this method is the one being tracked by the instance of the CyclicBarrier to compare to the number of parties (i.e. normally the first or only argument of the constructor.) specified during it's initialization.
    @Override
    public String call() throws Exception {
        System.out.println("Processing: " + _message);
        _barrier.await();
        return _message;
    }
Using the CyclicBarrier aware Task
  1. Create an instance of CyclicBarrier class with the expected number of parties (i.e. normally this is the first or only argument of the constructor) before it unblocks itself for all the threads waiting then back to blocking. Optionally, we can also pass a second argument of type Runnable that will only be invoked when the barrier unblocks like the following snippet (i.e. the complete code will be at the bottom).
    CyclicBarrier barrier = new CyclicBarrier(2 /*The number of parties. */
    , () -> System.out.println("Barrier open.") /*The logic to call when the barrier unblocks. */
    );
  2. Submit a number of tasks that corresponds to the multiple of the number of parties (e.g. from step 1 it could be multiple of 2) from step 1 like the following snippet (i.e. the complete code will be at the bottom):
    futures.add(executor.submit(new Teller(barrier, "One")));
    futures.add(executor.submit(new Teller(barrier, "Two")));
    futures.add(executor.submit(new Teller(barrier, "Three")));
    futures.add(executor.submit(new Teller(barrier, "Four")));
    
    futures.forEach((Future future) -> {
        try {
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    });
Observation

Upon running the complete code:

  • We must notice that the following message will be executed every multiple of 2 (i.e. observe the processing message):
    Barrier open.
  • The following message (i.e. can be in any order) will never be displayed before One and Two (i.e. can be in any order):
    Three
    Four
The Complete Code
package xyz.ronella.concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Barrier {

    public static void main(String[] args) {

        class Teller implements Callable {

            private CyclicBarrier _barrier;
            private String _message;

            public Teller(CyclicBarrier barrier, String message) {
                _barrier = barrier;
                _message = message;
            }

            @Override
            public String call() throws Exception {
                System.out.println("Processing: " + _message);
                _barrier.await();
                return _message;
            }
        }

        ExecutorService executor = Executors.newFixedThreadPool(2);

        CyclicBarrier barrier = new CyclicBarrier(2 /*The number of parties. */
                , () -> System.out.println("Barrier open.") /*The logic to call when the barrier unblocks. */
        );

        List<Future> futures = new ArrayList<>();

        try {
            futures.add(executor.submit(new Teller(barrier, "One")));
            futures.add(executor.submit(new Teller(barrier, "Two")));
            futures.add(executor.submit(new Teller(barrier, "Three")));
            futures.add(executor.submit(new Teller(barrier, "Four")));

            futures.forEach((Future future) -> {
                try {
                    System.out.println(future.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
        }
        finally {
            executor.shutdown();
        }
    }
}