Reactive Spring Data MongoDB: Tailable Cursors

In our previous article on the new Reactive Spring Data, we took a basic look at what MongoDB had to offer through its Reactive driver. We were able to send and receive information from and to our database in an event-stream-driven way. Instead of having to wait for the full result of our database call, we received our data in an event-driven way, enabling our Threads to not having to block, but rather be able to process data as it flowed into our application.

In this article we’ll take a closer look at taking this a step further, through a functionality offered by MongoDB called tailable cursors. These cursors will help us receive information from the database in a continuous way, we can use them to run a query that will keep giving results until explicitelly closed off – though keeping in mind that they can only be applied on capped collections. These have a rather different behaviour in comparison to the kind of collections you generally use in MongoDB.

When visualising a regular collection in MongoDB, it might be best to picture a filing cabinet. Documents can be put into it, and taken out again to read or update them. Capped collections, however, are different. Instead of a regular cabinet, it has a paper shredder installed into it. When we create a new capped collection, we are required to set a default max size that the collection can grow to – both in the sense of max amount of documents, as well as the max size in the number of bytes. In case we add a document to the cabinet which makes it grow beyond that size, the first document that got added will be run through the shredder to make room for the next one. This means that this should not be considered a valid method for long-term storage of documents. Another thing to keep into mind is that when a document gets added to a capped collection, it’s not possible for a user to delete it anymore (until it gets deleted on its own).

This different behaviour makes capped collections certainly usable for certain types of applications. Think for example of applications where you want to keep a realtime view on the latest events, like chat messages, or stock updates. Instead of having to run queries over and over again, we can use a tailable cursor on it which will keep us feeding the new elements that get added – while slowly forgetting about the old ones. Keeping in mind the concepts behind reactive data streams, it’s easy to visualize how these tailable cursors can make events flow further through your application, keeping the flows of information open in both directions.

Now we know a bit more about the behaviour of capped collections and uses of tailable cursors, let’s dive into the code. We’ve based ourselves on the codebase of our previous article on the new Reactive Spring Data, but with a couple of modifications.

First of all we added a new method to our repository, to have a tailable version of the findBy method. This is an extension of the typical Spring Data model, where interface methods will be auto-generated on the method names and arguments.

1
2
3
4
5
6
public interface ConcertTicketRepository extends ReactiveMongoRepository<ConcertTicket, Long> {

  @Tailable
  Flux<ConcertTicket> findWithTailableCursorBy();

}

The @Tailable annotation is required for a tailable cursor method. As we’d expect, the method will return a Flux of results, and because the cursor stays open, results that get later after we got the final one of the original list will be signaled as well through the Flux.

After implementing the findWithTailableCursorBy method, we can start using it in the rest of our example, which is again based on the one of our previous article on the new Reactive Spring Data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@SpringBootApplication
@EnableReactiveMongoRepositories
public class ReactiveSpringDataStartApplication {

    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext application = SpringApplication.run(ReactiveSpringDataStartApplication.class, args);

        final MongoOperations mongoOperations = application.getBean(MongoOperations.class);
        final ConcertTicketRepository concertTicketRepository = application.getBean(ConcertTicketRepository.class);

        if(mongoOperations.collectionExists("concertTicket")) {
           mongoOperations.dropCollection("concertTicket");
        }

        // Capped collections need to be created manually
        mongoOperations.createCollection("concertTicket", CollectionOptions.empty().capped().size(9999999L).maxDocuments(100L));

        final Mono<ConcertTicket> saveTicketOne = concertTicketRepository.save(new ConcertTicket("ABBA", "Jerry"));
        final Mono<ConcertTicket> saveTicketTwo = concertTicketRepository.save(new ConcertTicket("Hawkwind", "Kris"));

        saveTicketOne.subscribe();

        Thread.sleep(200);

        concertTicketRepository.findWithTailableCursorBy()
            .flatMap(ticket -> printTicketInformation(ticket))
            .subscribe();

        System.out.println("Let's wait a bit before saving the second ticket, the tailable cursor stays open for new events");
        Thread.sleep(2000);

        saveTicketTwo.subscribe();

        System.out.println("Will wait for the information to be printed from the database");
        Thread.sleep(1000);
    }

    private static Mono<ConcertTicket> printTicketInformation(ConcertTicket concertTicket) {
        System.out.println(String.format("Ticket Artist: %s Buyer: %s", concertTicket.getArtist(), concertTicket.getBuyer()));
        return Mono.just(concertTicket);
    }
}

Like last time, we start by getting the required beans, which are in this case not only the Reactive Spring Data Repository we just extended, but also a MongoOperations bean. This bean enables us to create a new capped collection (first deleting the old one in case it’s deemed necessary), where we then define the max size it can grow to.

After setting up this new collection, we define two reactive Streams, both to create a new ConcertTicket and save it to the database. We start by triggering one of them with a subscription, waiting a little bit after to make sure it got added.

Next, we trigger our new findWithTailableCursorBy method from the repository, which will signal us all the ConcertTickets currently in the collection – and will keep signaling us more of them as more of them get added. This means another user or another application even could be adding elements, and this will keep on streaming as events from the Flux we just created. Like last time, we guide the events coming from this Flux through the printTicketInformation method. This method will print some information about the ticket, so we can visualize the effects better.

Then we wait a bit longer, to really visualise the fact that all the ticket got printed. After two seconds we subscribe to the second saveTicket Mono triggering it and making sure the ConcertTicket gets stored in the capped collection as well. The tailable cursor moves on, and also passes this ConcertTicket as an event through the Flux, also printing out the ticket information.

1
2
3
4
Let's wait a bit before saving the second ticket, the tailable cursor stays open for new events
Ticket Artist: ABBA Buyer: Jerry
Will wait for the information to be printed from the database
Ticket Artist: Hawkwind Buyer: Kris

The output of our application proves the expected behaviour, and shows how the tailable cursors enable us to have future documents that get added to a capped collection also stream into our application, enabling real-time aspects through Reactive MongoDB.

This sample application can be found on Github