3
votes

I have a request that is rather simple to formulate, but I cannot pull it of without leaking resources.

I want to return a response of type application/stream+json, featuring news events someone posted. I do not want to use Websockets, not because I don't like them, I just want to know how to do it with a stream.

For this I need to return a Flux<News> from my restcontroller, that is continuously fed with news, once someone posts any.

My attempt for this was creating a Publisher:

public class UpdatePublisher<T> implements Publisher<T> {

    private List<Subscriber<? super T>> subscribers = new ArrayList<>();

    @Override
    public void subscribe(Subscriber<? super T> s) {
        subscribers.add(s);
    }

    public void pushUpdate(T message) {
        subscribers.forEach(s -> s.onNext(message));
    }

}

And a simple News Object:

public class News {
    String message;
    // Constructor, getters, some properties omitted for readability...
}

And endpoints to publish news respectively get the stream of news

// ...

private UpdatePublisher<String> updatePublisher = new UpdatePublisher<>();

@GetMapping(value = "/news/ticker", produces = "application/stream+json")
public Flux<News> getUpdateStream() {
     return Flux.from(updatePublisher).map(News::new);
}

@PutMapping("/news")
public void putNews(@RequestBody News news) {
    updatePublisher.pushUpdate(news.getMessage());
}

This WORKS, but I cannot unsubscribe, or access any given subscription again - so once a client disconnects, the updatePublisher will just continue to push onto a growing number of dead channels - as I have no way to call the onCompleted() handler on the subscriptions.

TL;DL:

Can one push messages onto a possible endless Flux from a different thread and still terminate the Flux on demand without relying on a reset by peer exception or something along those lines?

1

1 Answers

0
votes

You should never try to implement yourself the Publisher interface, as it boils down to getting the reactive streams implementation right. This is exactly the issue you're facing here.

Instead you should use one of the generator operators provided by Reactor itself (this is actually a Reactor question, nothing specific to Spring WebFlux).

In this case, Flux.create or Flux.push are probably the best candidates, given your code uses some type of event listener to push events down the stream. See the reactor project reference documentation on that.

Without more details, it's hard to give you a concrete code sample that solves your problem. Here are a few pointers though:

  • you might want to .share() the stream of events for all subscribers if you'd like some multicast-like communication pattern
  • pay attention to the push/pull/push+pull model that you'd like to have here; how is the backpressure supposed to work here? What if we produce more events that the subscribers can handle?
  • this model would only work on a single application instance. If you'd like this to work on multiple application instances, you might want to look into messaging patterns using a broker