0
votes

I am using SSE with reactor Flux for capturing linux system metrics and publishing to browser(javascript eventsource).

Currently, the problem is not able to send specific data to specific client. After going through multiple SO posts, it is told to use event listener in the eventsource and response from server should have event as key in the response.

Since, I am returning an object from the server, is it enough to have event as one of the key in the response json which will be identified by the eventsource.

Setting an unique number in the client side, which will be return in the event key in the json response.

Thanks for the help !

Javascript:

jsonStreamObjectHeap.addEventListener("197e08e-f9a4-4e6e-9a04-220ade08a8f4",function(e){
        $.each(message, function(index, value) {
           /*some operation*/
        });
        }

Response From Rest Controller:

{
   "event":"197e08e-f9a4-4e6e-9a04-220ade08a8f4",
   "data":"2048"
}

Rest Controller Code:

 Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
        interval.subscribe((i) -> testStreamList.forEach(testStream -> {
            try {
                generateTestStream(testStream, UUID);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }));
 Flux<List<TestStream>> transactionFlux = Flux.fromStream(Stream.generate(() -> testStreamList));
 return Flux.zip(interval, transactionFlux).map(Tuple2::getT2);
1
The subscriber is the calling client, your application is a publisher, so don't subscribe in the publisher. So remove the "subscribe". And then second, what is generateTestStream and testStramList. The code is hard to follow, am i correct to assume that all you want to do is to return one object from your testStreamList at a given interval (1 sec). And this object needs to be an SSE? If that is correct, this is a messy way of writing it, and the way you have written it is blocking too.Toerktumlare
@ThomasAndolf You are correct. Can you please advise, how can I proceed ? or any link/help ?Kaustav Ray

1 Answers

0
votes

I found out that org.springframework.http.codec.ServerSentEvent which can be plugged into your Flux response from the rest controller.

  Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));    
  Flux<HeapStat> transactionFlux = Flux.fromStream(Stream.generate(() -> heapStat));

  /*Bulding ServerSentEvent to from the tuple*/

  return Flux.zip(interval, transactionFlux).map(tuple->
           ServerSentEvent.<HeapStat>builder().event(jsessionId).data(tuple.getT2()).build()
                );

You can prepare response exactly like SSEEmmiterwith custom event name, Last-Event-ID