Combining Monos in Project Reactor

The Mono datatype in Project Reactor is used to make a Reactive operator that will signal back either a single element or an error somewhere in the future. We can even launch several of them, not blocking any threads while waiting for results to arrive.

There are several ways to combine these Monos. We’ll look over the most useful ones.

Concatenation of Monos

When you have two Monos of the same type, you can concatenate them into a Flux. This Flux will then get the results of the Monos in the order of the arguments given, forming a sequence of results. Of course the Monos are only called when someone subscribes to the resulting Flux.

In this example we call a “getPig” method twice that return a Mono. By using the “Flux.concat” method, we create a new Flux that should form a concatination of the two Mono’s. When subscribed to, this Flux will subscribe to each Mono sequentially, every time after receiving a value from the Mono before. Then we declare that for every result, the sendPigInformation method has to be called, and subscribe to the Flux. The subscription will triggers the Flux concatenation, which subsequently triggers the given Monos, which might cause a webclient or database call. After the pig result stream in through the Monos, the pigs flux will then cause the pig information to be sent.

1
2
3
4
5
6
7
8
public void sendPigInformation() {
    final Mono pigA = getPig(12l);
    final Mono pigB = getPig(25l);

    final Flux pigs = Flux.concat(pigA, pigB);

    pigs.doOnNext(this::sendPigInformation).subscribe();
}

Warning: In case an error occurs in a Mono, this will be passed into the Flux as well, and following Mono methods will not be executed.

Merging of Monos

Two Monos of the same, or even of different types, can be combined into one Mono through merging them. This way we can get multiple asynchronous results and aggregate them to a new result.

An example of this can be seen below, where we get information about the number of pigs and the farmer’s name. We combine the given results using the zipWith command. This command takes a number of input arguments, and a consuming method that helps combine these arguments into the resulting output. In this case the result of the numberOfPigs and farmerName Monos will be used as arguments for the FarmInformation constructor.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class FarmInformation {
    final Long numberOfPigs;
    final String farmerName;

    public FarmInformation(Long numberOfPigs, String farmerName) {
        this.numberOfPigs = numberOfPigs;
        this.farmerName = farmerName;
    }
}

public Mono printFarmInformation() {
    final Mono numberOfPigs = countPigs();
    final Mono farmerName = getFarmerName();

    return numberOfPigs.zipWith(farmerName, FarmInformation::new);
}

This method will have a Mono with FarmInformation in return, that can be subscribed to in return to build further on the Reactive Pipeline.

In this article we saw how to combine Mono’s through concatenating them to a Flux, and to merge them together into a new Mono.