Reactive stream is gaining traction in the mainstream programming and java has its own implementation via the Flow API. Popular reactive stream implementations are RxJava, Reactor and Akka.
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;
public class Main {
/**
* Sample subscriber implementation.
*/
public static class Subscriber implements Flow.Subscriber<Integer> {
/**
* Holds an instance of Flow.Subscription instance so that we can request what we can handle.
*/
private Flow.Subscription subscription;
/**
* Tracks if the publisher was closed.
*/
private boolean isDone;
/**
* Triggered on the initial subscription.
* @param subscription An instance of Flow.Subscription.
*/
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed");
this.subscription = subscription;
this.subscription.request(1);
}
/**
* Do the actual processing.
* @param item The actual item currently being processed.
*/
@Override
public void onNext(Integer item) {
System.out.println("Processing " + item);
this.subscription.request(1);
}
/**
* Holds how to handle error.
* @param throwable An instance of Throwable.
*/
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
/**
* Called with the publisher was closed or completed.
*/
@Override
public void onComplete() {
System.out.println("Processing done.");
isDone = true;
}
}
public static void main(String[] args) throws InterruptedException {
//The publisher of the data.
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//The sample subscriber implementation.
Subscriber subscriber = new Subscriber();
//Register a subscriber.
publisher.subscribe(subscriber);
//The sample stream to process.
var intData = IntStream.rangeClosed(1, 10);
//Publish the stream data.
intData.forEach(publisher::submit);
//The publisher is done.
publisher.close();
//Since this is processing is asynchronous wait for everything to be processed.
while(!subscriber.isDone) {
Thread.sleep(10);
}
System.out.println("Done");
}
}
Recent Comments