Java 9 Reactive Streams With Examples

Java 9 Reactive Streams allows us to implement non-blocking asynchronous stream processing. This is a major step towards applying reactive programming model to core java programming.

If you are new to reactive programming, please read Reactive Manifesto and go through short notes on Reactive Streams. RxJava and Akka Streams have been the popular implementation of reactive streams. Now java 9 has introduced reactive streams support through java.util.concurrent.Flow API.

Java 9 Reactive Streams

Reactive Streams is about asynchronous processing of stream, so there should be a Publisher and a Subscriber. The Publisher publishes the stream of data and the Subscriber consumes the data.

java-9-reactive-streams

Sometimes we have to transform the data between Publisher and Subscriber. Processor is the entity sitting between the end publisher and subscriber to transform the data received from publisher so that subscriber can understand it. We can have a chain of processors.

java-9-publisher-subscriber-processor

It’s very clear from the above image that Processor works both as Subscriber and a Publisher.

Java 9 Flow API

Java 9 Flow API implements the Reactive Streams Specification. Flow API is a combination of Iterator and Observer pattern. Iterator works on pull model where application pulls items from the source, whereas Observer works on push model and reacts when item is pushed from source to application.

Java 9 Flow API subscriber can request for N items while subscribing to the publisher. Then the items are pushed from publisher to subscriber until there are no more items left to push or some error occurs.

java-9-flow-api

Java 9 Flow API Classes and Interfaces

Let’s have a quick look at Flow API classes and interfaces.

  • java.util.concurrent.Flow: This is the main class of Flow API. This class encapsulates all the important interfaces of the Flow API. This is a final class and we can’t extend it.
  • java.util.concurrent.Flow.Publisher: This is a functional interface and every publisher has to implement it’s subscribe method to add the given subscriber to receive messages.
  • java.util.concurrent.Flow.Subscriber: Every subscriber has to implement this interface. The methods in the subscriber are invoked in strict sequential order. There are four methods in this interface:
    1. onSubscribe: This is the first method to get invoked when subscriber is subscribed to receive messages by publisher. Usually we invoke subscription.request to start receiving items from processor.
    2. onNext: This method gets invoked when an item is received from publisher, this is where we implement our business logic to process the stream and then request for more data from publisher.
    3. onError: This method is invoked when an irrecoverable error occurs, we can do cleanup taks in this method, such as closing database connection.
    4. onComplete: This is like finally method and gets invoked when no other items are being produced by publisher and publisher is closed. We can use it to send notification of successful processing of stream.
  • java.util.concurrent.Flow.Subscription: This is used to create asynchronous non-blocking link between publisher and subscriber. Subscriber invokes its request method to demand items from publisher. It also has cancel method to cancel the subscription i.e. closing the link between publisher and subscriber.
  • java.util.concurrent.Flow.Processor: This interface extends both Publisher and Subscriber, this is used to transform the message between publisher and subscriber.
  • java.util.concurrent.SubmissionPublisher: A Publisher implementation that asynchronously issues submitted items to current subscribers until it is closed. It uses Executor framework We will use this class in reactive stream examples to add subscriber and then submit items to them.

Java 9 Reactive Stream Example

Let’s start with a simple example where we will implement Flow API Subscriber interface and use SubmissionPublisher to create publisher and send messages.

Stream Data

Let’s say we have an Employee class that will be used to create the stream message to be sent from publisher to subscriber.

We also have a utility class to create a list of employees for our example.

Subscriber

  • Subscription variable to keep reference so that request can be made in onNext method.
  • counter variable to keep count of number of items processed, notice that it’s value is increased in onNext method. This will be used in our main method to wait for execution to finish before ending the main thread.
  • Subscription request is invoked in onSubscribe method to start the processing. Also notice that it’s again called in onNext method after processing the item, demanding next item to process from the publisher.
  • onError and onComplete doesn’t have much here, but in real world scenario they should be used to perform corrective measures when error occurs or cleanup of resources when processing completes successfully.

Reactive Stream Test Program

We will use SubmissionPublisher as Publisher for our examples, so let’s look at the test program for our reactive stream implementation.

The most important piece of above code is subscribe and submit methods invocation of publisher. We should always close publisher to avoid any memory leaks.

We will get following output when above program is executed.

Note that if we won’t have logic for main method to wait before all the items are processed, then we will get unwanted results.

Message Transformation Example

Processor is used to transform the message between a publisher and subscriber. Let’s say we have another subscriber which is expecting a different type of message to process. Let’s say this new message type is Freelancer.

We have a new subscriber to consume Freelancer stream data.

Processor

The important part is the implementation of Processor interface. Since we want to utilize the SubmissionPublisher, we would extend it and use it wherever applicable.

  • Function will be used to convert Employee object to Freelancer object.
  • We will convert incoming Employee message to Freelancer message in onNext method and then use SubmissionPublisher submit method to send it to the subscriber.
  • Since Processor works as both subscriber and publisher, we can create a chain of processors between end publishers and subscribers.

Message Transformation Test

Read the comments in the program to properly understand it, most important change is the creation of producer-processor-subscriber chain. We will get following output when above program is executed.

Cancel Subscription

We can use Subscription cancel method to stop receiving message in subscriber. Note that if we cancel the subscription, then subscriber will not receive onComplete or onError signal.

Here is a sample code where subscriber is consuming only 3 messages and then canceling the subscription.

Note that in this case, our logic to halt main thread before all the messages are processed will go into infinite loop. We can add some additional logic for this scenario, may be some global variable to look for if subscriber has stopped processing or canceled subscription.

Back Pressure

When publisher is producing messages in much faster rate than it’s being consumed by subscriber, back pressure gets built. Flow API doesn’t provide any mechanism to signal about back pressure or to deal with it. But we can devise our own strategy to deal with it, such as fine tuning the subscriber or reducing the message producing rate. You can read how RxJava deals with Back Pressure.

Summary

Java 9 Flow API is a good move towards reactive programming and to create asynchronous non-blocking application. However creating a true reactive application is possible only when all the systems API support it.

You can download the example code used in the tutorial from my GitHub Project.

By admin

Leave a Reply

%d bloggers like this: