Flux Sharing in Project Reactor: From one to many

Reactive Applications often require Reactive Streams to be subscribed to by multiple Subscribers at the same time. For example when the same information streams needs to be used by different components in the application, or when we want to let the data stream to different users. Especially in real-time applications this can be a huge advantage, think for example about a chatroom where the messages need to be sent to all users at the same time.

As we have seen before in the article TopicProcessor: Spreading the Message to Many we can use a TopicProcessor to do this. This will require a Thread per consumer however, and therefore is intended for specific scenario’s for this reason. In the case we have a large amount of consumers this won’t scale well, as a lot of Thread switching will be required in the application.

In the more general case of simply splitting up the Reactive Stream to multiple Subscribers while still using Project Reactor’s event loop system you can simply “share” your Reactive Stream. When using the share() method on a Flux, a new ConnectableFlux will be created that will multicast to any Subscriber. New Fluxes can then be created using the “from” method that takes the multicasting Flux as an argument. When new information runs through the original Flux, it will be passed on to all Fluxes created this way.

Let’s see a simple code example. We have a Flux counting up in an interval. The Flux is shared into 50’000 subscribing Fluxes, which will each print the information they receive onto the command line. The subscribing Fluxes all get a subscriptionNumber, so we can see in the command line output in which order they print their incoming data.

1
2
3
4
5
6
7
Flux<Long> startFlux = Flux.interval(Duration.ofMillis(1000)).share();

for (int i = 0; i < numberOfSubscriptions; i++) {
    final int subscriptionNumber = i;
    Flux outputFlux = Flux.from(startFlux);
    outputFlux.subscribe(out -> System.out.println("Flux " + subscriptionNumber + " " + out));
}

When running the code, we see the different Fluxes print to the command line fluently, ending with something similar to:

1
2
3
4
5
6
7
8
9
10
11
Flux 49990 99
Flux 49991 99
Flux 49992 99
Flux 49993 99
Flux 49994 99
Flux 49995 99
Flux 49996 99
Flux 49997 99
Flux 49998 99
Flux 49999 99
Process finished with exit code 0

This shows that the different Fluxes get their information flowing in from the original Flux, as we intended. This technique definitely offers a lot of architectural advantages. Especially when dealing with a lot of users that require the same information, the data can be provided concurrently. This of course also counts when dealing with microservice architectures where you’d want to send a lot of information to other systems concurrently, making sure you don’t block on I/O and achieving a very high throughput.

The code for this demo can be found on Github