Combining Fluxes in Project Reactor

Analogous to combining Mono’s, which we discussed in this article we can also combine Fluxes in several ways.

With Project Reactor we can use Fluxes to signal back zero, one, or more than one element. This makes it inherently different to the Mono datatype, which is only able to return one or zero elements. This of course means that Fluxes typically “live longer” than Mono’s. It’s not because you have received an element of a Flux, that you can’t receive any more of them. It’s only when a Flux sends its onComplete or onError signal, that it is done sending results.

Because Fluxes are more dynamic in this sense, there are also more methods of combining them. In this article, we’ll look into the most useful ways to combine Fluxes.

Concatenating Fluxes

As with Monos, two Fluxes of similar types (which can be of the same or different result length, unlike Monos) can be concatenated into a single result Flux. The resulting Flux will combine the result of the two given Fluxes in the given order.

A difference in the way we can do this for Monos, is that we’re also able to use the “concatWith” method on one of the target Fluxes, effectively chaining them together in a cleaner-looking way.

In the following example, we’ll get two lists of pigs, and concatenate them together. When using concatenation, the second Flux will only be subscribed to after the first Flux returns an “onComplete” signal. This means that they will be run sequentially, instead of in parallel.

1
2
3
4
5
6
7
8
public void sendPigInformation() {
    final Flux oldPigs = getOldPigs(12l);
    final Flux youngPigs = getYoungPigs(25l);

    final Flux pigs = oldPigs.concatWith(youngPigs);

    pigs.doOnNext(this::sendPigInformation).subscribe();
}

Merging Fluxes

Merging Fluxes is very similar to concatenating them. We take two or more Fluxes, which can be of different result lengths, and we combine the results into a resulting Flux. The difference between concatenating and merging however, is that the upflow Fluxes are launched concurrently rather than sequentially. This means that although the order within the result of the separate fluxes won’t change, we can have no expectations regarding the order of elements of the Fluxes separately. If we’d have one Flux returning “A”, “B”, “C”, and another one “D”, “E”, “F”, we might get a combination like [“A”, “B”, “D”, “C”, “E”, “F”], or [“D”, “A”, “B”, “C”, “E”, “F”], and so on.

In this example, we can expect of a mix of old and young pigs being used on the sendPigInformation method.

1
2
3
4
5
6
7
8
public void sendPigInformation() {
    final Flux oldPigs = getOldPigs(12l);
    final Flux youngPigs = getYoungPigs(25l);

    final Flux pigs = Flux.concat(pigA, pigB);

    pigs.doOnNext(this::sendPigInformation).subscribe();
}

Zipping Fluxes

When dealing with Fluxes of different types, you can use zipping to combine them. The elements coming from the different Fluxes will be merged together into a callback, up until one of the two Fluxes completes. A word of warning: this means that results which are not combined will simply be “forgotten”. Of course the two target fluxes will be called concurrently the moment we subscribe on the resulting Flux.

Similar to merging, and concatenating, we can either use a “zipWith” method, or use the static “Flux.zip” method.

In the following example we’ll combine two different Fluxes by zipping their elements together one by one.

1
2
3
4
5
6
7
8
public void printFarmInformation() {
    final Flux<String> letters = Flux.just("A", "B", "C", "D", "E");
    final Flux<Integer> numbers = Flux.just(1, 2, 3);

    final Flux lettersAndNumbers = letters.zipWith(numbers, (l, n) -> l + n);

    lettersAndNumbers.subscribe(System.out::println);
}

As we can see in the command line output, the first three elements of both Fluxes get zipped together, but the remaining items of the letters Flux get ignored.

1
2
3
A1
B2
C3

We can conclude that Fluxes offer many different ways to be combined with eachother, helping developers gain more controls over their Reactive Streams.