An introduction to Reactive Websockets

Websockets as a technology has been in use for quite a couple of years already. It’s battle-tested and has proven itself as a reliable way to support full-duplex real-time communication between a client and a server, or applications in general. Full-duplex implies that both sides (client and server) can take the initiative in sending messages.

This offers quite a bit of advantages in comparison to other kinds of communication techniques between clients and servers. A technique like server-sent events enables a server to continually send messages to a client, but the same stream can’t be used to also receive information from the client itself. Websockets do enable this, and since Reactive Programming tends to be naturally oriented around datastreams, this is a great fit.

To illustrate the usage of websockets, we’ll be building a small web application using Spring 5. Our webclient will send a string of text to the client through a websocket, and the server will reverse this string and let it stream back to the client.

A screenshot of the websocket client

To start off with, we create a standard Spring Boot Application.

We also require a couple of dependencies, which we can add in our pom.xml file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-reactive</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>
    <dependency>
        <groupId>org.thymeleaf</groupId>
        <artifactId>thymeleaf-spring5</artifactId>
    </dependency>
</dependencies>

In our application we find our Spring Boot Application class, which will help us tie everything together. It also contains a thymeleafTemplateResolver which will pick up our webclient later on.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
public class ReactiveWebsocketExampleApplication {

public static void main(String[] args) {
SpringApplication.run(ReactiveWebsocketExampleApplication.class, args);
}

@Bean
public SpringResourceTemplateResolver thymeleafTemplateResolver() {
    final SpringResourceTemplateResolver resolver = new SpringResourceTemplateResolver();
    resolver.setPrefix("classpath:templates/");
    resolver.setSuffix(".html");
    resolver.setTemplateMode(TemplateMode.HTML);
    return resolver;
}

We also add a MainController class, which will help us actually open our webclient.

1
2
3
4
5
6
7
8
9
@Controller
public class MainController {

    @GetMapping(value = "/")
    public Mono home() {
        return Mono.just("index");
    }

}

After adding these, we can also add our webclient files into the resources folder. We require a simple index.html page and a websocketConnection.js file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!DOCTYPE html>
    <html lang="en">
        <head>
        <meta charset="UTF-8">
        <title>Reactive WebSockets Example</title>
    </head>
    <body onload="openWebSocket()">
        <div>
        <input id="input">
        <button onclick="sendString()">Send</button>
    </div>
    <br />
        <textarea id="result" style="min-height: 200px; min-width: 400px"></textarea>
        <script type="text/javascript" src="webSocketConnection.js"></script>
    </body>
</html>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var webSocket;

function logMessage(msg) {
    document.getElementById("result").appendChild(document.createTextNode(msg + "\n"));
}

function openWebSocket() {
    webSocket = new WebSocket("ws://localhost:8080/stringConverter");

    webSocket.onopen = function() { logMessage("Opened the Websocket Connection!");};

    webSocket.onmessage = function(event) { logMessage("Result: " + event.data); }
}

function sendString() {
var input = document.getElementById("input").value;
    webSocket.send(input);
}

Now that our webclient is in place, we only have to add our Websocket streams. To configure our Websocket server we require a WebSocketConfiguration class. It will set up our websocket handler, so we can use it to create a reactive stream.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class WebSocketConfiguration {

@Autowired
private WebSocketHandler webSocketHandler;

@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
    return new WebSocketHandlerAdapter();
}

@Bean
public HandlerMapping handlerMapping() {
    Map<String, WebSocketHandler> map = new HashMap<>();
    map.put("/stringConverter", webSocketHandler);
    SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping();
    simpleUrlHandlerMapping.setOrder(10);
    simpleUrlHandlerMapping.setUrlMap(map);
    return simpleUrlHandlerMapping;
}

Our configuration requires a websockethandler, which we implement by using the following code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class WebSocketHandlers implements WebSocketHandler {
    public Flux<String> reverseString(String userInput) {
    return Flux.just(new StringBuilder(userInput).reverse().toString());
}

@Override
public Mono<Void> handle(WebSocketSession session) {
    session.send(session.receive()
    .map(WebSocketMessage::getPayloadAsText)
    .flatMap(this::reverseString)
    .map(reversedString -> session.textMessage(reversedString))).subscribe(System.out::println);

    return Mono.empty();
}

This also shows what could be considered the most important bit of this article, the websockethandler’s handle method. In it you can see how the session.receive() method is called, which will actually return a Flux of the websocket messages. The flux is mapped through the WebsocketMessage::getPayloadAsText method, which will give us the payload message.

We run this through the reverseString method (by using flatMap, although in this scenario, a simple map would have been good as well). Finally we run it through a map method, turning it into a text message to be sent. This will result in a Flux, which is passed as an argument to the session.send() method, having it stream to the user.

After implementing this, the code sample should run flawlessly.

Download the code

A special thanks to Lennert Wieërs for implementing the demo code.