1
votes

Tl;dr:

How to signal onNext on HotObservable AFTER I'm subscribed to the same hot observable?

Longer version:

my goal is to crete REST endpoint, which will emit Event, then wait before returning a response (Which will be an Event in itself) - basically event-driven REST endpoint with Command/Response.

As I have not found any solution, I've decided to do it via reactive java, via mapping spring events onto DirectProcessor publisher (Hot observable). I have:

https://github.com/Venthe/Exploratory-Projects/tree/reactive/reactive

@Slf4j
@Component
class EventDriver {
    private static DirectProcessor<Object> EVENTS = DirectProcessor.create();

    @EventListener(Object.class)
    public void onEvent(Object event) {
        log.info(MessageFormat.format("Event {0} is ready to be pushed into processor.", event.toString()));
        EVENTS.onNext(event);
    }

    // Not important, but it does show that my events are captured & processed via 'normal' subscription
    @EventListener(ApplicationStartedEvent.class)
    public void logger() {
        EVENTS.map(Object::toString).subscribe(e -> log.info(MessageFormat.format("Event {0} received by processor.", e)));
    }

    public Flux<Object> getEvents() {
        log.info("Requesting processor.");
        return EVENTS
                .doOnEach(l -> log.info(MessageFormat.format("Processing subscribed event {0}", l.toString())));
    }
}
@RestController
@RequiredArgsConstructor
class ReservationRestController {
    private final ApplicationEventPublisher eventDispatcher;
    private final EventDriver eventDriver;

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/wait-for-event/{name}")
    public Flux<String> performAction(@PathVariable String name) {
        return eventDriver.getEvents()
                .doOnSubscribe(s -> eventDispatcher.publishEvent(new MyEvent(name)))
                .map(Object::toString)
                .log();
    }

    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    class MyEvent {
        private String name;
    }
}

My understanding is: When I'm opening REST endpoint \wait-for-event\any, Reactive Web method performAction should FIRST subscribe to events, THEN dispatch event (And thus - triggering the pipeline), because of doOnSubscribe (Add behavior (side-effect) triggered when the {@link Flux} is done being subscribed)

Unfortunately, events are stuck - subscription does not see my event, here is the log

 : Requesting processor.
 : Event ReservationRestController.MyEvent(name=as3) is ready to be pushed into processor.
 : Event ReservationRestController.MyEvent(name=as3) received by processor.
 : | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
 : | request(1)

As far as I understand, I'm dispatching before onSubscribe is actually called.

Of course any other event that happens after will be correctly caught.

1

1 Answers

0
votes

I've returned to this issue after some time; solution was to first subscribe on events then zip with the dispatcher:

public String wtfperformActionPathVariable String name) {
        return Mono.from(
                eventDriver.getEvents()
                        .map(Object::toString)
                        .filter(b -> b.contains(name))
                        .log("Event source")
        )
                .zipWith(Mono.just(name)
                        .map(MyEvent::new)
                        .doOnNext(eventDispatcher::publishEvent)
                        .log("Publisher"))
                .map(Tuple2::getT1)
                .log("Result")
                .block();
    }