{"id":2112,"date":"2026-02-07T01:41:06","date_gmt":"2026-02-06T12:41:06","guid":{"rendered":"https:\/\/www.ronella.xyz\/?p=2112"},"modified":"2026-02-07T01:41:07","modified_gmt":"2026-02-06T12:41:07","slug":"java-stream-gatherers","status":"publish","type":"post","link":"https:\/\/www.ronella.xyz\/?p=2112","title":{"rendered":"Java Stream Gatherers"},"content":{"rendered":"<p>Gatherers let you encode custom, often stateful, intermediate operations in a stream pipeline, going far beyond what <code>map<\/code>, <code>filter<\/code>, or <code>flatMap<\/code> can express.<\/p>\n<hr \/>\n<h2>1. Why Gatherers Exist<\/h2>\n<p>A <code>Gatherer&lt;T, A, R&gt;<\/code> describes how elements of type <code>T<\/code> flow through an intermediate stage, optionally using state <code>A<\/code>, and emitting elements of type <code>R<\/code>. <\/p>\n<ul>\n<li>It can perform one\u2011to\u2011one, one\u2011to\u2011many, many\u2011to\u2011one, or many\u2011to\u2011many transformations.<\/li>\n<li>It can maintain mutable state across elements, short\u2011circuit processing, and support parallel execution if given a combiner.<\/li>\n<li>You attach it using <code>Stream.gather(gatherer)<\/code>.<\/li>\n<\/ul>\n<p>This is analogous to <code>Collector<\/code> for terminal operations, but acts <em>mid\u2011pipeline<\/em> instead of at the end. <\/p>\n<hr \/>\n<h2>2. <code>Gatherer.of<\/code> \u2013 Building Parallel\u2011Capable Gatherers<\/h2>\n<p>You typically create gatherers using the static <code>of<\/code> factory methods.<\/p>\n<p>A key overload is: <\/p>\n<pre><code class=\"language-java\">static &lt;T, A, R&gt; Gatherer&lt;T, A, R&gt; of(\n        java.util.function.Supplier&lt;A&gt; initializer,\n        Gatherer.Integrator&lt;A, T, R&gt; integrator,\n        java.util.function.BinaryOperator&lt;A&gt; combiner,\n        java.util.function.BiConsumer&lt;A, Gatherer.Downstream&lt;? super R&gt;&gt; finisher\n)<\/code><\/pre>\n<h3>2.1 Arguments<\/h3>\n<ul>\n<li><code>Supplier&lt;A&gt; initializer<\/code> \u2013 creates the mutable state <code>A<\/code> for each pipeline branch. <\/li>\n<li><code>Gatherer.Integrator&lt;A, T, R&gt; integrator<\/code> \u2013 per\u2011element logic; updates state, may <code>push<\/code> outputs, and controls short\u2011circuit via its boolean return.<\/li>\n<li><code>BinaryOperator&lt;A&gt; combiner<\/code> \u2013 merges two states when running in parallel.<\/li>\n<li><code>BiConsumer&lt;A, Downstream&lt;? super R&gt;&gt; finisher<\/code> \u2013 flushes remaining state at the end of processing.<\/li>\n<\/ul>\n<p>There are simpler overloads (e.g., stateless, no finisher) when you don\u2019t need all four.<\/p>\n<h3>2.2 Example: Custom <code>map<\/code> Using <code>Gatherer.of<\/code> (Stateless, Parallelizable)<\/h3>\n<p>From the JDK docs, a gatherer equivalent to <code>Stream.map<\/code> can be written as:<\/p>\n<pre><code class=\"language-java\">import java.util.function.Function;\nimport java.util.stream.Gatherer;\nimport java.util.stream.Stream;\n\npublic static &lt;T, R&gt; Gatherer&lt;T, ?, R&gt; map(Function&lt;? super T, ? extends R&gt; mapper) {\n    \/\/ stateless; state type is Void, no initializer\/combiner needed\n    return Gatherer.of(\n            (Gatherer.Integrator&lt;Void, T, R&gt;) (state, element, downstream) -&gt; {\n                downstream.push(mapper.apply(element));\n                return true; \/\/ continue\n            }\n    );\n}\n\nvoid main() {\n    Stream.of(&quot;a&quot;, &quot;bb&quot;, &quot;ccc&quot;)\n          .gather(map(String::length))\n          .forEach(System.out::println);\n}<\/code><\/pre>\n<ul>\n<li>The gatherer is <em>parallelizable<\/em> because we used <code>of<\/code>, and it\u2019s stateless (<code>Void<\/code> state).<\/li>\n<li>Rationale: for a simple one\u2011to\u2011one transformation, no state or finisher is needed; the integrator only pushes mapped elements.<\/li>\n<\/ul>\n<hr \/>\n<h2>3. <code>Gatherer.ofSequential<\/code> \u2013 Sequential\u2011Only Gatherers<\/h2>\n<p>For logic that is inherently sequential or where you don\u2019t care about parallel execution, you use <code>ofSequential<\/code>.<\/p>\n<p>Typical overloads:<\/p>\n<pre><code class=\"language-java\">static &lt;T, R&gt; Gatherer&lt;T, Void, R&gt; ofSequential(\n        Gatherer.Integrator&lt;Void, T, R&gt; integrator\n)\n\nstatic &lt;T, A, R&gt; Gatherer&lt;T, A, R&gt; ofSequential(\n        java.util.function.Supplier&lt;A&gt; initializer,\n        Gatherer.Integrator&lt;A, T, R&gt; integrator\n)\n\nstatic &lt;T, A, R&gt; Gatherer&lt;T, A, R&gt; ofSequential(\n        java.util.function.Supplier&lt;A&gt; initializer,\n        Gatherer.Integrator&lt;A, T, R&gt; integrator,\n        java.util.function.BiConsumer&lt;A, Gatherer.Downstream&lt;? super R&gt;&gt; finisher\n)<\/code><\/pre>\n<ul>\n<li>These gatherers are explicitly sequential; no combiner is provided and they are not used for parallel pipelines.<\/li>\n<\/ul>\n<h3>3.1 Example: Prefix Scan Using <code>ofSequential<\/code><\/h3>\n<p>The JDK docs show a prefix scan implemented with <code>ofSequential<\/code>: <\/p>\n<pre><code class=\"language-java\">import java.util.function.BiFunction;\nimport java.util.function.Supplier;\nimport java.util.stream.Gatherer;\nimport java.util.stream.Stream;\n\npublic static &lt;T, R&gt; Gatherer&lt;T, ?, R&gt; scan(\n        Supplier&lt;R&gt; initial,\n        BiFunction&lt;? super R, ? super T, ? extends R&gt; scanner\n) {\n    class State {\n        R current = initial.get();\n    }\n\n    return Gatherer.&lt;T, State, R&gt;ofSequential(\n            State::new,\n            Gatherer.Integrator.ofGreedy((state, element, downstream) -&gt; {\n                state.current = scanner.apply(state.current, element);\n                return downstream.push(state.current); \/\/ emit new prefix\n            })\n    );\n}\n\nvoid main() {\n    var numberStrings =\n            Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)\n                  .gather(scan(() -&gt; &quot;&quot;, (string, number) -&gt; string + number))\n                  .toList();\n\n    System.out.println(numberStrings);\n}<\/code><\/pre>\n<ul>\n<li>Output: <code>[&quot;1&quot;, &quot;12&quot;, &quot;123&quot;, ... &quot;123456789&quot;]<\/code>.<\/li>\n<li>Rationale: prefix scan is inherently order\u2011sensitive and naturally modeled as sequential; <code>ofSequential<\/code> expresses that contract directly.<\/li>\n<\/ul>\n<hr \/>\n<h2>4. Declaring a Sink Gatherer<\/h2>\n<p>Example of a \u201clog\u2011only\u201d gatherer that never forwards elements:<\/p>\n<pre><code class=\"language-java\">import java.util.stream.Gatherer;\nimport java.util.stream.Stream;\n\npublic static Gatherer&lt;String, ?, String&gt; loggingSink() {\n    return Gatherer.ofSequential(\n            (Gatherer.Integrator&lt;Void, String, String&gt;) (state, element, downstream) -&gt; {\n                System.out.println(&quot;LOG: &quot; + element);\n                \/\/ Don&#039;t push anything downstream - just log and continue\n                return true;\n            }\n    );\n}\n\nvoid main() {\n    Stream.of(&quot;one&quot;, &quot;two&quot;, &quot;three&quot;)\n          .gather(loggingSink())\n          .forEach(s -&gt; System.out.println(&quot;Downstream got: &quot; + s)); \/\/ prints nothing downstream\n}<\/code><\/pre>\n<ul>\n<li>Here, the downstream will see nothing; the only observable effect is the logging side\u2011effect.<\/li>\n<\/ul>\n<hr \/>\n<h2>5. Built\u2011In Gatherer: <code>windowSliding<\/code><\/h2>\n<p>The <code>Gatherers.windowSliding<\/code> method provides sliding windows as lists.<\/p>\n<p>Signature: <\/p>\n<pre><code class=\"language-java\">static &lt;T&gt; java.util.stream.Gatherer&lt;T, ?, java.util.List&lt;T&gt;&gt; windowSliding(int windowSize)<\/code><\/pre>\n<p>Behavior:<\/p>\n<ul>\n<li>Produces overlapping windows of size <code>windowSize<\/code> in encounter order.  <\/li>\n<li>Each new window drops the oldest element and adds the next.  <\/li>\n<li>If the stream is empty, no windows; if shorter than <code>windowSize<\/code>, one window containing all elements.  <\/li>\n<\/ul>\n<h3>5.1 Example: Sliding Windows of Integers<\/h3>\n<pre><code class=\"language-java\">import java.util.List;\nimport java.util.stream.Gatherers;\nimport java.util.stream.Stream;\n\nvoid main() {\n    List&lt;List&lt;Integer&gt;&gt; windows =\n            Stream.of(1, 2, 3, 4, 5, 6, 7, 8)\n                  .gather(Gatherers.windowSliding(3))\n                  .toList();\n\n    windows.forEach(System.out::println);\n}<\/code><\/pre>\n<p>Expected result: <code>[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8]]<\/code>.<\/p>\n<p>Rationale:<\/p>\n<ul>\n<li>Sliding windows are a classic stateful pattern that require remembering the last <code>windowSize - 1<\/code> elements.  <\/li>\n<li>Implementing this manually with <code>map<\/code>\/<code>flatMap<\/code> is error\u2011prone; <code>windowSliding<\/code> encapsulates it as a reusable gatherer.<\/li>\n<\/ul>\n<hr \/>\n<h2>6. Built\u2011In Gatherer: <code>mapConcurrent<\/code><\/h2>\n<p><code>mapConcurrent<\/code> applies a function concurrently using virtual threads while preserving stream order. Signature:<\/p>\n<pre><code class=\"language-java\">static &lt;T, R&gt; java.util.stream.Gatherer&lt;T, ?, R&gt; mapConcurrent(\n        int maxConcurrency,\n        java.util.function.Function&lt;? super T, ? extends R&gt; mapper\n)<\/code><\/pre>\n<p>Behavior:<\/p>\n<ul>\n<li>Executes <code>mapper<\/code> concurrently with up to <code>maxConcurrency<\/code> in\u2011flight tasks.  <\/li>\n<li>Uses virtual threads (Loom), so it scales well for blocking tasks.<\/li>\n<li>Preserves encounter order when emitting results downstream.<\/li>\n<li>Attempts to cancel in\u2011progress tasks when downstream no longer wants more elements.<\/li>\n<\/ul>\n<h3>6.1 Example: Concurrent \u201cRemote\u201d Work<\/h3>\n<pre><code class=\"language-java\">import java.util.List;\nimport java.util.stream.Gatherers;\nimport java.util.stream.Stream;\n\npublic class MapConcurrentDemo {\n\n    private static String fetchRemote(String id) {\n        try {\n            Thread.sleep(300); \/\/ simulate blocking IO\n        } catch (InterruptedException e) {\n            Thread.currentThread().interrupt();\n        }\n        return &quot;response-for-&quot; + id + &quot; on &quot; + Thread.currentThread();\n    }\n\n     static void main() {\n        List&lt;String&gt; responses =\n                Stream.of(&quot;A&quot;, &quot;B&quot;, &quot;C&quot;, &quot;D&quot;, &quot;E&quot;)\n                      .gather(Gatherers.mapConcurrent(3, MapConcurrentDemo::fetchRemote))\n                      .toList();\n\n        responses.forEach(System.out::println);\n    }\n}<\/code><\/pre>\n<ul>\n<li>Up to 3 virtual threads will run <code>fetchRemote<\/code> concurrently.<\/li>\n<li>The result list preserves the order <code>&quot;A&quot;, &quot;B&quot;, &quot;C&quot;, &quot;D&quot;, &quot;E&quot;<\/code>.<\/li>\n<\/ul>\n<p>Rationale:<\/p>\n<ul>\n<li>Compared to <code>parallel()<\/code>, <code>mapConcurrent<\/code> is explicit about concurrency level, suited for blocking IO, and guarantees order, making it a better fit for many modern workloads.<\/li>\n<\/ul>\n<hr \/>\n<h2>7. Putting It All Together<\/h2>\n<p>You now have:<\/p>\n<ul>\n<li><strong><code>Gatherer.of<\/code><\/strong> to build parallel\u2011capable gatherers when you need full control, including a combiner and finisher.<\/li>\n<li><strong><code>Gatherer.ofSequential<\/code><\/strong> for simpler or inherently sequential logic, with examples like prefix scan. <\/li>\n<li><strong><code>Gatherers.windowSliding<\/code><\/strong> and <strong><code>Gatherers.mapConcurrent<\/code><\/strong> as high\u2011level, ready\u2011made gatherers for windowing and concurrent mapping.<\/li>\n<\/ul>\n<p>With these building blocks, you can design expressive, stateful, and performance\u2011aware stream pipelines using the latest Java Stream API.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Gatherers let you encode custom, often stateful, intermediate operations in a stream pipeline, going far beyond what map, filter, or flatMap can express. 1. Why Gatherers Exist A Gatherer&lt;T, A, R&gt; describes how elements of type T flow through an intermediate stage, optionally using state A, and emitting elements of type R. It can perform [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[93],"tags":[],"_links":{"self":[{"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/posts\/2112"}],"collection":[{"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=2112"}],"version-history":[{"count":1,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/posts\/2112\/revisions"}],"predecessor-version":[{"id":2113,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=\/wp\/v2\/posts\/2112\/revisions\/2113"}],"wp:attachment":[{"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=2112"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=2112"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.ronella.xyz\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=2112"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}