1
votes

How can I implement an internal event bus to do async operations in a webflux spring stack?

I want a service to emit an event:

@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
  }
}

And a different component, not known by the publisher service, should be able to decide to react on that event.

@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
  override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
    // do stuff
  }
}

In a MVC application I would use ApplicationEventPublisher to publish the event (publishEvent) and @EventListener+@Async on the handler (onDeleteEntry).

What is the equivalent in a reactive stack?

The other option I think about is running an embedded messaging service, because that should imply async semantics. But this feels like a lot of overhead for a simple scenario.


I found these SO threads

but they don't answer this scenario, because they assume that the listener is known by the publisher. But I need loosely coupling.

I also found these spring issues

And specifically see this comment promising suggesting this:

Mono.fromRunnable(() -> context.publishEvent(...))

From what I understand I could then just use @EventListener since I am totally fine with not propagating the reactive context.

But can please someone explain the implications for the thread-bounding and if this is even legal in a reactive stack?


UPDATE

From testing it feels fine to do this:

@Service
class FeedServiceImpl(
  val applicationEventPublisher: ApplicationEventPublisher,
) : FeedService {
  @EventListener
  @Async
  override fun handle(e: FeedEntryDeletedEvent) {
    log.info("Handler started")
    runBlocking {
      // do stuff that takes some time
      delay(1000)
    }
    log.info("ThreadId: ${Thread.currentThread().id}")
    log.info("Handler done")
  }

  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    applicationEventPublisher.publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
    log.info("ThreadId: ${Thread.currentThread().id}")
    log.info("Publisher done")
  }
}

Note that handle is not a suspend function, because @EventListener must have a single argument and coroutines introduce the continuation parameter behind the scene. The handler then launches a new blocking coroutine scope which is fine because it is on a different thread because of the @Async.

Output is:

2021-05-13 12:15:20.755  INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl  : ThreadId: 38
2021-05-13 12:15:20.755  INFO 20252 --- [         task-1] ...FeedServiceImpl   : Handler started
2021-05-13 12:15:20.755  INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl   : Publisher done
2021-05-13 12:15:21.758  INFO 20252 --- [         task-1] ...FeedServiceImpl   : ThreadId: 54
2021-05-13 12:15:21.759  INFO 20252 --- [         task-1] ...FeedServiceImpl   : Handler done

UPDATE 2

The other approach without using @Async would be this:

  @EventListener
//  @Async
  override fun handle(e: FeedEntryDeletedEvent) {
    log.info("Handler start")
    log.info("Handler ThreadId: ${Thread.currentThread().id}")
    runBlocking {
      log.info("Handler block start")
      delay(1000)
      log.info("Handler block ThreadId: ${Thread.currentThread().id}")
      log.info("Handler block end")
    }
    log.info("Handler done")
  }

  override suspend fun deleteEntry(entryId: Long) {
    feedRepository.deleteById(entryId)
    Mono.fromRunnable<Unit> {
      applicationEventPublisher.publishEvent(
        FeedEntryDeletedEvent(
          timestamp = time.utcMillis(),
          entryId = entryId,
        )
      )
    }
      .subscribeOn(Schedulers.boundedElastic())
      .subscribe()
    log.info("Publisher ThreadId: ${Thread.currentThread().id}")
    log.info("Publisher done")
  }

2021-05-13 13:06:54.503  INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl  : Publisher ThreadId: 38
2021-05-13 13:06:54.503  INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl  : Publisher done
2021-05-13 13:06:54.504  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler start
2021-05-13 13:06:54.504  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler ThreadId: 53
2021-05-13 13:06:54.505  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block start
2021-05-13 13:06:55.539  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block ThreadId: 53
2021-05-13 13:06:55.539  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block end
2021-05-13 13:06:55.540  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler done

However, I still don't understand the implications for the application under load and it feels wrong to mix reactive operations with handlers that do runBlocking { }.

1

1 Answers

3
votes

Reactor offers Sink. You can use it like an event bus. Have a look at the following example.

@Configuration
public class EventNotificationConfig {

    @Bean
    public Sinks.Many<EventNotification> eventNotifications() {
        return Sinks.many().replay().latest();
    }

}

You create a Bean of a Sink in a @Configuration class. This can be used to emit new events and it can be turned into a Flux for subscribers.

@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationUsecase {

    private final @NonNull Sinks.Many<EventNotification> eventNotifications;


    /**
     * Provide a flux with our notifications.
     *
     * @return a Flux
     */
    public Flux<EventNotification> getNotifications() {
        return eventNotifications.asFlux();
    }

    /**
     * Emit a new event to the sink.
     *
     * @param eventId
     * @param status
     * @param payload
     */
    public void emitNotification(final String eventId, final EventNotification.Status status, final Map<String, Object> payload) {
        eventNotifications.tryEmitNext(EventNotification.builder()
          .eventId(eventId)
          .status(status)
          .payload(payload).build());
    }

}

You can keep a maximum of one Sink instance in your application. Subscribing to different kinds of events can be achieved with filters that the various subscribers can apply to the Flux.


@Component
@RequiredArgsConstructor
@Slf4j
public class EventListener {

    private final @NonNull NotificationUsecase notificationUsecase;


    /**
     * Start listening to events as soon as class EventListener
     * has been constructed.
     *
     * Listening will continue until the Flux emits a 'completed'
     * signal.
     */
    @PostConstruct
    public void init() {

        this.listenToPings()
                .subscribe();
        this.listenToDataFetched()
                .subscribe();
    }


    public Flux<EventNotification> listenToPings() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .filter(notification -> notification.getStatus().equals(EventNotification.Status.PING))
                .doOnNext(notification -> log.info("received PING: {}", notification));
    }

    public Flux<EventNotification> listenToDataFetched() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .filter(notification -> notification.getStatus().equals(EventNotification.Status.DATA_FETCHED))
                .doOnNext(notification -> log.info("received data: {}", notification));
    }
}

Emitting new events is equally simple. Just call the emit-method:



notificationUsecase.emitNotification(eventId, EventNotification.Status.PING, payload);