Sample Usage of Java 9 Flow (Reactive Stream) API

Reactive stream is gaining traction in the mainstream programming and java has its own implementation via the Flow API. Popular reactive stream implementations are RxJava, Reactor and Akka.

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;

public class Main {

    /**
     * Sample subscriber implementation.
     */
    public static class Subscriber implements Flow.Subscriber<Integer> {

        /**
         * Holds an instance of Flow.Subscription instance so that we can request what we can handle.
         */
        private Flow.Subscription subscription;

        /**
         * Tracks if the publisher was closed.
         */
        private boolean isDone;

        /**
         * Triggered on the initial subscription.
         * @param subscription An instance of Flow.Subscription.
         */
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("Subscribed");
            this.subscription = subscription;
            this.subscription.request(1);
        }

        /**
         * Do the actual processing.
         * @param item The actual item currently being processed.
         */
        @Override
        public void onNext(Integer item) {
            System.out.println("Processing " + item);
            this.subscription.request(1);
        }

        /**
         * Holds how to handle error.
         * @param throwable An instance of Throwable.
         */
        @Override
        public void onError(Throwable throwable) {
            throwable.printStackTrace();
        }

        /**
         * Called with the publisher was closed or completed.
         */
        @Override
        public void onComplete() {
            System.out.println("Processing done.");
            isDone = true;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        
        //The publisher of the data.
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

        //The sample subscriber implementation.
        Subscriber subscriber = new Subscriber();
        
        //Register a subscriber.
        publisher.subscribe(subscriber);

        //The sample stream to process.
        var intData = IntStream.rangeClosed(1, 10);

        //Publish the stream data. 
        intData.forEach(publisher::submit);

        //The publisher is done.
        publisher.close();

        //Since this is processing is asynchronous wait for everything to be processed. 
        while(!subscriber.isDone) {
            Thread.sleep(10);
        }
        
        System.out.println("Done");
    }
}