Gatherers let you encode custom, often stateful, intermediate operations in a stream pipeline, going far beyond what map, filter, or flatMap can express.


1. Why Gatherers Exist

A Gatherer<T, A, R> describes how elements of type T flow through an intermediate stage, optionally using state A, and emitting elements of type R.

  • It can perform one‑to‑one, one‑to‑many, many‑to‑one, or many‑to‑many transformations.
  • It can maintain mutable state across elements, short‑circuit processing, and support parallel execution if given a combiner.
  • You attach it using Stream.gather(gatherer).

This is analogous to Collector for terminal operations, but acts mid‑pipeline instead of at the end.


2. Gatherer.of – Building Parallel‑Capable Gatherers

You typically create gatherers using the static of factory methods.

A key overload is:

static <T, A, R> Gatherer<T, A, R> of(
        java.util.function.Supplier<A> initializer,
        Gatherer.Integrator<A, T, R> integrator,
        java.util.function.BinaryOperator<A> combiner,
        java.util.function.BiConsumer<A, Gatherer.Downstream<? super R>> finisher
)

2.1 Arguments

  • Supplier<A> initializer – creates the mutable state A for each pipeline branch.
  • Gatherer.Integrator<A, T, R> integrator – per‑element logic; updates state, may push outputs, and controls short‑circuit via its boolean return.
  • BinaryOperator<A> combiner – merges two states when running in parallel.
  • BiConsumer<A, Downstream<? super R>> finisher – flushes remaining state at the end of processing.

There are simpler overloads (e.g., stateless, no finisher) when you don’t need all four.

2.2 Example: Custom map Using Gatherer.of (Stateless, Parallelizable)

From the JDK docs, a gatherer equivalent to Stream.map can be written as:

import java.util.function.Function;
import java.util.stream.Gatherer;
import java.util.stream.Stream;

public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {
    // stateless; state type is Void, no initializer/combiner needed
    return Gatherer.of(
            (Gatherer.Integrator<Void, T, R>) (state, element, downstream) -> {
                downstream.push(mapper.apply(element));
                return true; // continue
            }
    );
}

void main() {
    Stream.of("a", "bb", "ccc")
          .gather(map(String::length))
          .forEach(System.out::println);
}
  • The gatherer is parallelizable because we used of, and it’s stateless (Void state).
  • Rationale: for a simple one‑to‑one transformation, no state or finisher is needed; the integrator only pushes mapped elements.

3. Gatherer.ofSequential – Sequential‑Only Gatherers

For logic that is inherently sequential or where you don’t care about parallel execution, you use ofSequential.

Typical overloads:

static <T, R> Gatherer<T, Void, R> ofSequential(
        Gatherer.Integrator<Void, T, R> integrator
)

static <T, A, R> Gatherer<T, A, R> ofSequential(
        java.util.function.Supplier<A> initializer,
        Gatherer.Integrator<A, T, R> integrator
)

static <T, A, R> Gatherer<T, A, R> ofSequential(
        java.util.function.Supplier<A> initializer,
        Gatherer.Integrator<A, T, R> integrator,
        java.util.function.BiConsumer<A, Gatherer.Downstream<? super R>> finisher
)
  • These gatherers are explicitly sequential; no combiner is provided and they are not used for parallel pipelines.

3.1 Example: Prefix Scan Using ofSequential

The JDK docs show a prefix scan implemented with ofSequential:

import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Gatherer;
import java.util.stream.Stream;

public static <T, R> Gatherer<T, ?, R> scan(
        Supplier<R> initial,
        BiFunction<? super R, ? super T, ? extends R> scanner
) {
    class State {
        R current = initial.get();
    }

    return Gatherer.<T, State, R>ofSequential(
            State::new,
            Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                state.current = scanner.apply(state.current, element);
                return downstream.push(state.current); // emit new prefix
            })
    );
}

void main() {
    var numberStrings =
            Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
                  .gather(scan(() -> "", (string, number) -> string + number))
                  .toList();

    System.out.println(numberStrings);
}
  • Output: ["1", "12", "123", ... "123456789"].
  • Rationale: prefix scan is inherently order‑sensitive and naturally modeled as sequential; ofSequential expresses that contract directly.

4. Declaring a Sink Gatherer

Example of a “log‑only” gatherer that never forwards elements:

import java.util.stream.Gatherer;
import java.util.stream.Stream;

public static Gatherer<String, ?, String> loggingSink() {
    return Gatherer.ofSequential(
            (Gatherer.Integrator<Void, String, String>) (state, element, downstream) -> {
                System.out.println("LOG: " + element);
                // Don't push anything downstream - just log and continue
                return true;
            }
    );
}

void main() {
    Stream.of("one", "two", "three")
          .gather(loggingSink())
          .forEach(s -> System.out.println("Downstream got: " + s)); // prints nothing downstream
}
  • Here, the downstream will see nothing; the only observable effect is the logging side‑effect.

5. Built‑In Gatherer: windowSliding

The Gatherers.windowSliding method provides sliding windows as lists.

Signature:

static <T> java.util.stream.Gatherer<T, ?, java.util.List<T>> windowSliding(int windowSize)

Behavior:

  • Produces overlapping windows of size windowSize in encounter order.
  • Each new window drops the oldest element and adds the next.
  • If the stream is empty, no windows; if shorter than windowSize, one window containing all elements.

5.1 Example: Sliding Windows of Integers

import java.util.List;
import java.util.stream.Gatherers;
import java.util.stream.Stream;

void main() {
    List<List<Integer>> windows =
            Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
                  .gather(Gatherers.windowSliding(3))
                  .toList();

    windows.forEach(System.out::println);
}

Expected result: [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8]].

Rationale:

  • Sliding windows are a classic stateful pattern that require remembering the last windowSize - 1 elements.
  • Implementing this manually with map/flatMap is error‑prone; windowSliding encapsulates it as a reusable gatherer.

6. Built‑In Gatherer: mapConcurrent

mapConcurrent applies a function concurrently using virtual threads while preserving stream order. Signature:

static <T, R> java.util.stream.Gatherer<T, ?, R> mapConcurrent(
        int maxConcurrency,
        java.util.function.Function<? super T, ? extends R> mapper
)

Behavior:

  • Executes mapper concurrently with up to maxConcurrency in‑flight tasks.
  • Uses virtual threads (Loom), so it scales well for blocking tasks.
  • Preserves encounter order when emitting results downstream.
  • Attempts to cancel in‑progress tasks when downstream no longer wants more elements.

6.1 Example: Concurrent “Remote” Work

import java.util.List;
import java.util.stream.Gatherers;
import java.util.stream.Stream;

public class MapConcurrentDemo {

    private static String fetchRemote(String id) {
        try {
            Thread.sleep(300); // simulate blocking IO
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "response-for-" + id + " on " + Thread.currentThread();
    }

     static void main() {
        List<String> responses =
                Stream.of("A", "B", "C", "D", "E")
                      .gather(Gatherers.mapConcurrent(3, MapConcurrentDemo::fetchRemote))
                      .toList();

        responses.forEach(System.out::println);
    }
}
  • Up to 3 virtual threads will run fetchRemote concurrently.
  • The result list preserves the order "A", "B", "C", "D", "E".

Rationale:

  • Compared to parallel(), mapConcurrent is explicit about concurrency level, suited for blocking IO, and guarantees order, making it a better fit for many modern workloads.

7. Putting It All Together

You now have:

  • Gatherer.of to build parallel‑capable gatherers when you need full control, including a combiner and finisher.
  • Gatherer.ofSequential for simpler or inherently sequential logic, with examples like prefix scan.
  • Gatherers.windowSliding and Gatherers.mapConcurrent as high‑level, ready‑made gatherers for windowing and concurrent mapping.

With these building blocks, you can design expressive, stateful, and performance‑aware stream pipelines using the latest Java Stream API.