Tl;dr:
How to signal onNext on HotObservable AFTER I'm subscribed to the same hot observable?
Longer version:
my goal is to crete REST endpoint, which will emit Event, then wait before returning a response (Which will be an Event in itself) - basically event-driven REST endpoint with Command/Response.
As I have not found any solution, I've decided to do it via reactive java, via mapping spring events onto DirectProcessor publisher (Hot observable). I have:
https://github.com/Venthe/Exploratory-Projects/tree/reactive/reactive
@Slf4j
@Component
class EventDriver {
private static DirectProcessor<Object> EVENTS = DirectProcessor.create();
@EventListener(Object.class)
public void onEvent(Object event) {
log.info(MessageFormat.format("Event {0} is ready to be pushed into processor.", event.toString()));
EVENTS.onNext(event);
}
// Not important, but it does show that my events are captured & processed via 'normal' subscription
@EventListener(ApplicationStartedEvent.class)
public void logger() {
EVENTS.map(Object::toString).subscribe(e -> log.info(MessageFormat.format("Event {0} received by processor.", e)));
}
public Flux<Object> getEvents() {
log.info("Requesting processor.");
return EVENTS
.doOnEach(l -> log.info(MessageFormat.format("Processing subscribed event {0}", l.toString())));
}
}
@RestController
@RequiredArgsConstructor
class ReservationRestController {
private final ApplicationEventPublisher eventDispatcher;
private final EventDriver eventDriver;
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/wait-for-event/{name}")
public Flux<String> performAction(@PathVariable String name) {
return eventDriver.getEvents()
.doOnSubscribe(s -> eventDispatcher.publishEvent(new MyEvent(name)))
.map(Object::toString)
.log();
}
@AllArgsConstructor
@NoArgsConstructor
@Data
class MyEvent {
private String name;
}
}
My understanding is: When I'm opening REST endpoint \wait-for-event\any, Reactive Web method performAction should FIRST subscribe to events, THEN dispatch event (And thus - triggering the pipeline), because of doOnSubscribe (Add behavior (side-effect) triggered when the {@link Flux} is done being subscribed)
Unfortunately, events are stuck - subscription does not see my event, here is the log
: Requesting processor.
: Event ReservationRestController.MyEvent(name=as3) is ready to be pushed into processor.
: Event ReservationRestController.MyEvent(name=as3) received by processor.
: | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
: | request(1)
As far as I understand, I'm dispatching before onSubscribe is actually called.
Of course any other event that happens after will be correctly caught.