TopicProcessor in Project Reactor: Spreading the message to many

When building your Reactive Application, you sometimes want to split up your single stream into several ones, running concurrently. One of the ways this can be done using the TopicProcessor.

Very similary to the concept of a Topic seen in JMS, a TopicProcessor can be used to subscribe on one or multiple streams, and have several streams subscribe to it. By doing so, you can create multiple streams out of a single one. These streams will all run on their own threads. This means you can use the TopicProcessor to go from multiple input threads to multiple output threads.

A simple code example can be seen here, where we have a Flux counting up in an interval, which is then assigned to a TopicProcessor. Next, 50 Fluxes are created based on the topicProcessor, so they are subscribed to it. Afterwards, a simple subscription is done on all the created fluxes, who then all concurrently get the info originally streamed from the original Flux Stream.

1
2
3
4
5
6
7
8
9
final TopicProcessor topicProcessor = TopicProcessor.create();

Flux.interval(Duration.ofMillis(1000)).subscribe(topicProcessor);

    for(int i = 0; i < numberOfSubscriptions; i++) {
    final int subscriptionNumber = i;
    Flux outputFlux = Flux.from(topicProcessor);
    outputFlux.subscribe(out -> System.out.println("Flux " + subscriptionNumber + " " + out));
}

A word of warning, however. Since the TopicProcessor will create a new Thread for every subscriber, the above piece of code will result into the creation of 50 additional threads, to supply all the subscribers. This can of course cause performance issues. In case multiple threads aren’t required, or you want the threading to be handled by Project Reactor, it’s better to share your Flux.

The code for this demo can be found on Github