We are using Vert.x together with RxJava. Our application logic is working, however, the code, despite the use of RxJava, still contains callbacks which are nested several layers deep. Below is one example. All logging and additional logic has been removed in order to highlight the structure of the code.
EventBus eb = vertx.eventBus();
// retrieve incoming messages from the consumer by listening on the event bus
MessageConsumer<BusMessage> consumer = eb.<BusMessage>consumer(address);
Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
consumerObservable.subscribe(cosumedMsg -> {
Object msg = cosumedMsg.body().encode());
Single<Message<MyObject>> step1 = eb.<MyObject>rxSend(addrs1, msg, deliveryOpts));
step1.subscribe(validationRes -> {
Single<Message<MyObject>> step2 = eb.rxSend(addrs2, msg, deliveryOpts));
step2.subscribe(handlerRes -> {
Single<Message<MyObject>> step3 = eb.rxSend(addrs3, msg, deliveryOpts));
step3.subscribe(dsRes -> {
Single<Message<MyObject>> step4 = eb.rxSend(addrs4, msg, deliveryOpts));
step4.subscribe(msg -> {
msg.body();
}, err -> {
// step1 error handling
});
}, err -> {
// step2 error handling
});
}, err -> {
// step3 error handling
});
}, err -> {
// step4 error handling
});
Very possibly I am not using RxJava correctly. Is there a way to restructure the sample code using RaxJava's reactive features so as to avoid the nested callbacks?
Thanks