{"id":792,"date":"2019-06-07T14:35:25","date_gmt":"2019-06-07T02:35:25","guid":{"rendered":"https:\/\/www.ronella.xyz\/?p=792"},"modified":"2021-09-28T14:00:12","modified_gmt":"2021-09-28T01:00:12","slug":"sample-usage-of-java-9-flow-reactive-stream-api","status":"publish","type":"post","link":"https:\/\/www.ronella.xyz\/?p=792","title":{"rendered":"Sample Usage of Java 9 Flow (Reactive Stream) API"},"content":{"rendered":"<p><strong>Reactive stream<\/strong> is gaining traction in the mainstream programming and java has its own implementation via the <strong>Flow API<\/strong>. Popular reactive stream implementations are RxJava, Reactor and Akka.<\/p>\n<pre style=\"white-space: pre;\">import java.util.concurrent.Flow;\r\nimport java.util.concurrent.SubmissionPublisher;\r\nimport java.util.stream.IntStream;\r\n\r\npublic class Main {\r\n\r\n    \/**\r\n     * Sample subscriber implementation.\r\n     *\/\r\n    public static class Subscriber implements Flow.Subscriber&lt;Integer&gt; {\r\n\r\n        \/**\r\n         * Holds an instance of Flow.Subscription instance so that we can request what we can handle.\r\n         *\/\r\n        private Flow.Subscription subscription;\r\n\r\n        \/**\r\n         * Tracks if the publisher was closed.\r\n         *\/\r\n        private boolean isDone;\r\n\r\n        \/**\r\n         * Triggered on the initial subscription.\r\n         * @param subscription An instance of Flow.Subscription.\r\n         *\/\r\n        @Override\r\n        public void onSubscribe(Flow.Subscription subscription) {\r\n            System.out.println(\"Subscribed\");\r\n            this.subscription = subscription;\r\n            this.subscription.request(1);\r\n        }\r\n\r\n        \/**\r\n         * Do the actual processing.\r\n         * @param item The actual item currently being processed.\r\n         *\/\r\n        @Override\r\n        public void onNext(Integer item) {\r\n            System.out.println(\"Processing \" + item);\r\n            this.subscription.request(1);\r\n        }\r\n\r\n        \/**\r\n         * Holds how to handle error.\r\n         * @param throwable An instance of Throwable.\r\n         *\/\r\n        @Override\r\n        public void onError(Throwable throwable) {\r\n            throwable.printStackTrace();\r\n        }\r\n\r\n        \/**\r\n         * Called with the publisher was closed or completed.\r\n         *\/\r\n        @Override\r\n        public void onComplete() {\r\n            System.out.println(\"Processing done.\");\r\n            isDone = true;\r\n        }\r\n    }\r\n\r\n    public static void main(String[] args) throws InterruptedException {\r\n        \r\n        \/\/The publisher of the data.\r\n        SubmissionPublisher&lt;Integer&gt; publisher = new SubmissionPublisher&lt;&gt;();\r\n\r\n        \/\/The sample subscriber implementation.\r\n        Subscriber subscriber = new Subscriber();\r\n        \r\n        \/\/Register a subscriber.\r\n        publisher.subscribe(subscriber);\r\n\r\n        \/\/The sample stream to process.\r\n        var intData = IntStream.rangeClosed(1, 10);\r\n\r\n        \/\/Publish the stream data. \r\n        intData.forEach(publisher::submit);\r\n\r\n        \/\/The publisher is done.\r\n        publisher.close();\r\n\r\n        \/\/Since this is processing is asynchronous wait for everything to be processed. \r\n        while(!subscriber.isDone) {\r\n            Thread.sleep(10);\r\n        }\r\n        \r\n        System.out.println(\"Done\");\r\n    }\r\n}<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>Reactive stream is gaining traction in the mainstream programming and java has its own implementation via the Flow API. Popular reactive stream implementations are RxJava, Reactor and Akka. import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.stream.IntStream; public class Main { \/** * Sample subscriber implementation. *\/ public static class Subscriber implements Flow.Subscriber&lt;Integer&gt; { \/** * Holds an [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[43,71],"tags":[],"_links":{"self":[{"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/posts\/792"}],"collection":[{"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=792"}],"version-history":[{"count":3,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/posts\/792\/revisions"}],"predecessor-version":[{"id":795,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/posts\/792\/revisions\/795"}],"wp:attachment":[{"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=792"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=792"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=792"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}