1
votes

We are using Vert.x together with RxJava. Our application logic is working, however, the code, despite the use of RxJava, still contains callbacks which are nested several layers deep. Below is one example. All logging and additional logic has been removed in order to highlight the structure of the code.

EventBus eb = vertx.eventBus();
// retrieve incoming messages from the consumer by listening on the event bus
MessageConsumer<BusMessage> consumer = eb.<BusMessage>consumer(address);
Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
consumerObservable.subscribe(cosumedMsg -> {
  Object msg = cosumedMsg.body().encode());
  Single<Message<MyObject>> step1 = eb.<MyObject>rxSend(addrs1, msg, deliveryOpts));
  step1.subscribe(validationRes -> {
    Single<Message<MyObject>> step2 = eb.rxSend(addrs2, msg, deliveryOpts));
    step2.subscribe(handlerRes -> {
      Single<Message<MyObject>> step3 = eb.rxSend(addrs3, msg, deliveryOpts));
        step3.subscribe(dsRes -> {
          Single<Message<MyObject>> step4 = eb.rxSend(addrs4, msg, deliveryOpts));
            step4.subscribe(msg -> {
              msg.body();
            }, err -> {
            // step1 error handling
          });
        }, err -> {
        // step2 error handling
      });
    }, err -> {
    //  step3 error handling
  });
 }, err -> {
 // step4 error handling
});

Very possibly I am not using RxJava correctly. Is there a way to restructure the sample code using RaxJava's reactive features so as to avoid the nested callbacks?

Thanks

2

2 Answers

2
votes

In my opinion, you should have only one subscribe() in your Rx chain at the end. You should chain your eventbus call with a flatMap() chain. Something like that:

    public void start() throws Exception {
        EventBus eventBus = vertx.eventBus();
        MessageConsumer<BusMessage> consumer = eventBus.consumer("my.address");
        Observable<Message<BusMessage>> consumerObservable = consumer.toObservable();
        consumerObservable
                .flatMapSingle(consumerMsg -> {
                    final Object msg = consumerMsg.body().encode();
                    return eventBus.rxSend("addrs1", msg, new DeliveryOptions());
                })
                .flatMapSingle(step1Msg -> eventBus.rxSend("addrs2", step1Msg, new DeliveryOptions()))
                .flatMapSingle(step2Msg -> eventBus.rxSend("addrs3", step2Msg, new DeliveryOptions()))
                .subscribe(this::handleFinalMsg,
                           this::handleException,
                           this::handleOnComplete);
    }

    private void handleFinalMsg(final Message<Object> message) {
        System.out.println(message.body());
    }

    private void handleException(final Throwable throwable) {
        System.err.println(throwable);
    }

    private void handleOnComplete() {
        System.out.println("it's the end!");
    }

Any exception that would occur is handled by this::handleException.

You can have a look at this blog post: https://streamdata.io/blog/vert-x-and-the-async-calls-chain/ (and its associated code) and may be at this code https://github.com/ctranxuan/howtos-vertx/tree/master/eventbus-rxjava to get a possible insight.

I hope this helps.

-1
votes

Rxjava provides a lot of operators to model your Logic in Functional fashion, these nested callbacks could be easily turned into chain of logic using operators. You don't need to get the error at any operator, because Rx already throw those errors down to the stream, in one place so you could behave accordingly. error handling is a big topic in Rx but this is the basic idea .. take each callback

  • convert each logic step into Rx step using the right operator.
  • take each callback error handling and put them down the the stream.