12

I've been learning a bit about Spring 5 WebFlux, reactive programming and websockets. I've watched Josh Long's Spring Tips: Reactive WebSockets with Spring Framework 5. The code that sends data from server to client through a WebSocket connection uses a Spring Integration IntegrationFlow that publishes to a PublishSubcribeChannel which has a custom MessageHandler subscribed to it that takes the message, converts it to an object that is then converted to Json and emitted to the FluxSink from the callback supplied to Flux.create(), which is used to send to the WebSocketConnection.

I was wondering if the use of IntegrationFlow and PublishSubscribeChannel is the recommended way to push events from a background process to the client, or if this is just more convenient in this particular example (monitoring the file system). I'd think if you have control over the background process, you could have it emit to the FluxSink directly?

I'm thinking about use cases similar to the following:

  • a machine learning process whose progress is monitored
  • updates to the state of a game world that are sent to players
  • chat rooms / team collaboration software
  • ...
3
  • Quite interested in the answer to this as well. Reactor's Flux isn't a message bus, so maybe that's why a Channel is more appropriate. Commented Dec 19, 2017 at 15:41
  • Quite interested in an answer as well Commented Aug 17, 2018 at 9:25
  • I don't see any reason not to publish background events directly to a FluxSink. It seems quite a bit simpler to do as far as implementation. Commented Sep 14, 2022 at 22:04

2 Answers 2

3

What I've done in the past that has worked for me is to create a Spring Component that implements WebSocketHandler:

@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {

Then in the handle method, Spring injects the WebSocketSession object

@Override
public Mono<Void> handle(WebSocketSession session) {

Then create one or more Flux reactive publishers that emit messages(WebSocketMessage) for the client.

    final var output = session.send(Flux.merge(flux1, flux2));

Then you can zip up the incoming and outgoing Flux objects in a Mono and then Spring will take it from there.

        return Mono.zip(incomingWebsocketMsgResponse.getWebSocketMsgFlux().then(),
                outputWithErrorMsgs)
                .then();

Example: https://howtodoinjava.com/spring-webflux/reactive-websockets/

1

Since this question, Spring introduced RSocket support - you might think about it like the WebSocket STOMP support existing in Spring MVC, but much more powerful and efficient, supporting backpressure and advanced communication patterns at the protocol level.

For the use cases you're mentioning, I'd advise using RSocket as you'd get a powerful programming model with @MessageMapping and all the expected support in Spring (codecs for JSON and CBOR, security, etc).

3
  • 1
    RSocket is great since it brings networking in as a first-class citizen into the reactive paradigm. However, RSocket and Websockets are not mutually exclusive. RSocket is a protocol that runs on TCP or Websockets. The WebSocket protocol runs on layer 7 of the OSI model like HTTP/2 while RSocket runs on OSI Layer 5/6.
    – anataliocs
    Commented Dec 1, 2020 at 22:52
  • 1
    unfortunately the RSocket websocket client for javascript is currently only at 0.0.19 and already 10 months old. see npmjs.com/package/rsocket-websocket-client
    – herman
    Commented Dec 2, 2020 at 14:23
  • Rsocket itself is only at 0.2
    – anataliocs
    Commented Dec 2, 2020 at 22:21

Not the answer you're looking for? Browse other questions tagged or ask your own question.