Gatherers let you encode custom, often stateful, intermediate operations in a stream pipeline, going far beyond what map, filter, or flatMap can express.
1. Why Gatherers Exist
A Gatherer<T, A, R> describes how elements of type T flow through an intermediate stage, optionally using state A, and emitting elements of type R.
- It can perform one‑to‑one, one‑to‑many, many‑to‑one, or many‑to‑many transformations.
- It can maintain mutable state across elements, short‑circuit processing, and support parallel execution if given a combiner.
- You attach it using
Stream.gather(gatherer).
This is analogous to Collector for terminal operations, but acts mid‑pipeline instead of at the end.
2. Gatherer.of – Building Parallel‑Capable Gatherers
You typically create gatherers using the static of factory methods.
A key overload is:
static <T, A, R> Gatherer<T, A, R> of(
java.util.function.Supplier<A> initializer,
Gatherer.Integrator<A, T, R> integrator,
java.util.function.BinaryOperator<A> combiner,
java.util.function.BiConsumer<A, Gatherer.Downstream<? super R>> finisher
)
2.1 Arguments
Supplier<A> initializer– creates the mutable stateAfor each pipeline branch.Gatherer.Integrator<A, T, R> integrator– per‑element logic; updates state, maypushoutputs, and controls short‑circuit via its boolean return.BinaryOperator<A> combiner– merges two states when running in parallel.BiConsumer<A, Downstream<? super R>> finisher– flushes remaining state at the end of processing.
There are simpler overloads (e.g., stateless, no finisher) when you don’t need all four.
2.2 Example: Custom map Using Gatherer.of (Stateless, Parallelizable)
From the JDK docs, a gatherer equivalent to Stream.map can be written as:
import java.util.function.Function;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {
// stateless; state type is Void, no initializer/combiner needed
return Gatherer.of(
(Gatherer.Integrator<Void, T, R>) (state, element, downstream) -> {
downstream.push(mapper.apply(element));
return true; // continue
}
);
}
void main() {
Stream.of("a", "bb", "ccc")
.gather(map(String::length))
.forEach(System.out::println);
}
- The gatherer is parallelizable because we used
of, and it’s stateless (Voidstate). - Rationale: for a simple one‑to‑one transformation, no state or finisher is needed; the integrator only pushes mapped elements.
3. Gatherer.ofSequential – Sequential‑Only Gatherers
For logic that is inherently sequential or where you don’t care about parallel execution, you use ofSequential.
Typical overloads:
static <T, R> Gatherer<T, Void, R> ofSequential(
Gatherer.Integrator<Void, T, R> integrator
)
static <T, A, R> Gatherer<T, A, R> ofSequential(
java.util.function.Supplier<A> initializer,
Gatherer.Integrator<A, T, R> integrator
)
static <T, A, R> Gatherer<T, A, R> ofSequential(
java.util.function.Supplier<A> initializer,
Gatherer.Integrator<A, T, R> integrator,
java.util.function.BiConsumer<A, Gatherer.Downstream<? super R>> finisher
)
- These gatherers are explicitly sequential; no combiner is provided and they are not used for parallel pipelines.
3.1 Example: Prefix Scan Using ofSequential
The JDK docs show a prefix scan implemented with ofSequential:
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public static <T, R> Gatherer<T, ?, R> scan(
Supplier<R> initial,
BiFunction<? super R, ? super T, ? extends R> scanner
) {
class State {
R current = initial.get();
}
return Gatherer.<T, State, R>ofSequential(
State::new,
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
state.current = scanner.apply(state.current, element);
return downstream.push(state.current); // emit new prefix
})
);
}
void main() {
var numberStrings =
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.gather(scan(() -> "", (string, number) -> string + number))
.toList();
System.out.println(numberStrings);
}
- Output:
["1", "12", "123", ... "123456789"]. - Rationale: prefix scan is inherently order‑sensitive and naturally modeled as sequential;
ofSequentialexpresses that contract directly.
4. Declaring a Sink Gatherer
Example of a “log‑only” gatherer that never forwards elements:
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public static Gatherer<String, ?, String> loggingSink() {
return Gatherer.ofSequential(
(Gatherer.Integrator<Void, String, String>) (state, element, downstream) -> {
System.out.println("LOG: " + element);
// Don't push anything downstream - just log and continue
return true;
}
);
}
void main() {
Stream.of("one", "two", "three")
.gather(loggingSink())
.forEach(s -> System.out.println("Downstream got: " + s)); // prints nothing downstream
}
- Here, the downstream will see nothing; the only observable effect is the logging side‑effect.
5. Built‑In Gatherer: windowSliding
The Gatherers.windowSliding method provides sliding windows as lists.
Signature:
static <T> java.util.stream.Gatherer<T, ?, java.util.List<T>> windowSliding(int windowSize)
Behavior:
- Produces overlapping windows of size
windowSizein encounter order. - Each new window drops the oldest element and adds the next.
- If the stream is empty, no windows; if shorter than
windowSize, one window containing all elements.
5.1 Example: Sliding Windows of Integers
import java.util.List;
import java.util.stream.Gatherers;
import java.util.stream.Stream;
void main() {
List<List<Integer>> windows =
Stream.of(1, 2, 3, 4, 5, 6, 7, 8)
.gather(Gatherers.windowSliding(3))
.toList();
windows.forEach(System.out::println);
}
Expected result: [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8]].
Rationale:
- Sliding windows are a classic stateful pattern that require remembering the last
windowSize - 1elements. - Implementing this manually with
map/flatMapis error‑prone;windowSlidingencapsulates it as a reusable gatherer.
6. Built‑In Gatherer: mapConcurrent
mapConcurrent applies a function concurrently using virtual threads while preserving stream order. Signature:
static <T, R> java.util.stream.Gatherer<T, ?, R> mapConcurrent(
int maxConcurrency,
java.util.function.Function<? super T, ? extends R> mapper
)
Behavior:
- Executes
mapperconcurrently with up tomaxConcurrencyin‑flight tasks. - Uses virtual threads (Loom), so it scales well for blocking tasks.
- Preserves encounter order when emitting results downstream.
- Attempts to cancel in‑progress tasks when downstream no longer wants more elements.
6.1 Example: Concurrent “Remote” Work
import java.util.List;
import java.util.stream.Gatherers;
import java.util.stream.Stream;
public class MapConcurrentDemo {
private static String fetchRemote(String id) {
try {
Thread.sleep(300); // simulate blocking IO
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "response-for-" + id + " on " + Thread.currentThread();
}
static void main() {
List<String> responses =
Stream.of("A", "B", "C", "D", "E")
.gather(Gatherers.mapConcurrent(3, MapConcurrentDemo::fetchRemote))
.toList();
responses.forEach(System.out::println);
}
}
- Up to 3 virtual threads will run
fetchRemoteconcurrently. - The result list preserves the order
"A", "B", "C", "D", "E".
Rationale:
- Compared to
parallel(),mapConcurrentis explicit about concurrency level, suited for blocking IO, and guarantees order, making it a better fit for many modern workloads.
7. Putting It All Together
You now have:
Gatherer.ofto build parallel‑capable gatherers when you need full control, including a combiner and finisher.Gatherer.ofSequentialfor simpler or inherently sequential logic, with examples like prefix scan.Gatherers.windowSlidingandGatherers.mapConcurrentas high‑level, ready‑made gatherers for windowing and concurrent mapping.
With these building blocks, you can design expressive, stateful, and performance‑aware stream pipelines using the latest Java Stream API.
Recent Comments