Flux Caching in Project Reactor: Replaying past data

In the article Flux Sharing in Project Reactor: From one to many we looked at how we can attach multiple Fluxes to an initial Flux by sharing it, to create multiple substreams from our original Reactive stream. This offers quite a bit of extra power. We can “feed” data to different subsystems that can consume at their own pace through backpressure, but also have multiple users getting the same data streamed in. This enables for a massively concurrent use of message pipelines.

The way we implemented this scenario in last article can have one big disadvantage though. After we subscribe a new Flux to a ConnectableFlux, it will only have new messages streaming in. It won’t be getting the messages that have been gone through it before. In case you’re building a chatroom there might be a requirement for some historical data as well. To achieve this, you can apply the concept of Flux caching.

By calling one of the “cache” methods on a Flux, it will keep an internal store of the items that pass through it until they become invalid. This means that when a new Subscriber is applied to the Flux, it will also get the relevant historical data.

In the code below we create a Flux that will generate a random number every second, and start listening on it. After 5 seconds we attach a second Flux to it, which will start printing out the same numbers.

1
2
3
4
5
6
7
8
9
Flux<Long> startFlux = Flux.interval(Duration.ofSeconds(1)).share();

Flux firstFlux = Flux.from(startFlux);
firstFlux.subscribe(out -> System.out.println("firstFlux value: " + out));
new CountDownLatch(1).await(5, TimeUnit.SECONDS);

Flux secondFlux = Flux.from(startFlux);
secondFlux.subscribe(out -> System.out.println("secondFlux value: " + out));
new CountDownLatch(1).await(5, TimeUnit.SECONDS);

When we look at the command line, we can see the following result. The secondFlux values are the same as those from the firstFlux, because he’s “plugged in” on the startFlux, but the numbers from before are lost.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
firstFlux value: 0
firstFlux value: 1
firstFlux value: 2
firstFlux value: 3
firstFlux value: 4
firstFlux value: 5
secondFlux value: 5
firstFlux value: 6
secondFlux value: 6
firstFlux value: 7
secondFlux value: 7
firstFlux value: 8
secondFlux value: 8
firstFlux value: 9
secondFlux value: 9

Through a small code change we can change this behaviour. We simply use the Cache() method on the original Flux, so it will keep previous values in an internal cache.

1
2
3
4
5
6
7
8
9
Flux<Long> startFlux = Flux.interval(Duration.ofSeconds(1)).share().cache();

Flux firstFlux = Flux.from(startFlux);
firstFlux.subscribe(out -> System.out.println("firstFlux value: " + out));
new CountDownLatch(1).await(5, TimeUnit.SECONDS);

Flux secondFlux = Flux.from(startFlux);
secondFlux.subscribe(out -> System.out.println("secondFlux value: " + out));
new CountDownLatch(1).await(5, TimeUnit.SECONDS);

When we run this code, we get the following results. We can see that the secondFlux will have the older data from the startFlux stream in right after it gets connected.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
firstFlux value: 0
firstFlux value: 1
firstFlux value: 2
firstFlux value: 3
firstFlux value: 4
secondFlux value: 0
secondFlux value: 1
secondFlux value: 2
secondFlux value: 3
secondFlux value: 4
firstFlux value: 5
secondFlux value: 5
firstFlux value: 6
secondFlux value: 6
firstFlux value: 7
secondFlux value: 7
firstFlux value: 8
secondFlux value: 8
firstFlux value: 9
secondFlux value: 9

The cache method can be overridden with a couple of different arguments, like a timeduration or max number of items that can be held in the cache until the old ones become invalid and are removed from the cache.

As we can see, Flux caching can be a powerful mechanism when building an application that should support massive concurrency. New Subscribers to the Flux won’t only get the new information, but also information from a duration before that. Some usecases where this could be applied well are chatrooms, stock tickers, IoT device monitoring, etc.

The code for this demo can be found on Github