An introduction to Reactor Kafka

In An introduction to Apache Kafka we looked at Apache Kafka, a distributed streaming platform. This time we’ll be looking at Reactor Kafka, a library that enables the creation of Reactive Streams from Project Reactor to Kafka Topics and the other way around.

Our sample applications

We’ll be using two small sample applications, Paymentprocessor Gateway, and PaymentValidator. The code for these applications can be found here.

The Paymentprocessor gateway offers a small webpage that generates a random credit card number (an obviously fake one, that is), along with an amount to pay. When the user clicks on the submit button, the form gets submitted to the gateway’s API. The API has a Reactive Stream to an unconfirmed-transactions topic on the Kafka cluster.

On the other side of that unconfirmed-transactions topic is the PaymentValidator, listening for incoming messages to validate. These messages then go through a reactive pipeline, where a validation method prints them to the command line.

Sending messages to Kafka through Reactive Streams

Our applications are built on top of Spring 5 and Spring Boot 2, enabling us to quickly set up and use Project Reactor.

The goal of the Gateway application is to set up a Reactive stream from a webcontroller to the Kafka cluster. This means we require specific dependencies to spring webflux and reactor-kafka.

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

A Spring Webflux RestController offers the payment API, creating a Reactive streams to the paymentGateway class doPayment method.

1
2
3
4
5
6
7
8
9
10
11
12
13
    /**
     * The Mono returned by the call will be sent to Spring Webflux, which relies on an multi-reactor event-loop and NIO
     * to handle requests in a non-blocking manner, enabling far more concurrent requests. The result will be sent over
     * HTTP through a mechanism called Server Sent Events
     **/

    @PostMapping(value = "/payment")
    public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) {
        /**
         When calling the doPayment method, we send our payment information, getting a Mono<Void> in return.
         This event will resolve when our payment is sent successfully to the Kafka topic
         **/

        return paymentGateway.doPayment(payment);
    }

The paymentGateway requires a kafkaProducer, which enables us to place messages on a Kafka topic as part of our pipeline. It can be easily created using the KafkaSender.create method, passing a number of producer options.

1
2
3
4
5
6
7
8
9
10
    public PaymentGatewayImpl() {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);

        kafkaProducer = KafkaSender.create(producerOptions);
    }

After its creation, the kafkaProducer can be used to easily send our message to a Kafka topic of choice, becoming part of the pipeline started in the controller. Because the messages are sent in a non-blocking way to the Kafka cluster, we can use project Reactor’s event loop receive and route a very large amount of concurrent messages from our web API to Kafka.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    @Override
    public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
        final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);

        String payload = toBinary(payment);

        SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
        return kafkaProducer.send(Mono.just(message)).next();
    }

    private String toBinary(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(e);
        }
    }

Creating a Reactive Stream from a Kafka topic

There’s not much sense in sending messages to a topic when no-one listens, so our second application will have a reactive pipeline listening to the unconfirmed-transactions topic. To do so, a kafkaReceiver object is created using the KafkaReceiver.create method, similar to how we created a kafkaProducer earlier.

Through using the kafkaReceiver.receive method, we can then get a Flux of receiverRecords. Each of the messages placed on the topic that we read are placed in a receiverRecord. After flowing into the application, they go further through the reactive pipeline. These messages then pass the processEvent method which calls the paymentValidator that prints some information to the console. Finally the acknowledge method gets called on their receiverOffset, sending a confirmation to the Kafka cluster that the message has been processed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
        this.paymentValidator = paymentValidator;

        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("unconfirmed-transactions"))
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));

        kafkaReceiver = KafkaReceiver.create(consumerOptions);

        /**
         * We create a receiver for new unconfirmed transactions
         */

        ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                .doOnNext(r -> {
                    /**
                     * Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
                     */

                    final PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.class);
                    processEvent(paymentEvent);
                    r.receiverOffset().acknowledge();
                })
                .subscribe();
    }

    private void processEvent(PaymentEvent paymentEvent) {
        paymentValidator.calculateResult(paymentEvent);
    }

    private <T> T fromBinary(String object, Class<T> resultType) {
        try {
            return objectMapper.readValue(object, resultType);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

The code for this example can be found here

1 response to "An introduction to Reactor Kafka"

Comments are closed.