1
votes

Dears, I am trying to do some kind of event-driven Microservices. Currently, I was able to consume a message from Kafka and update database record when message is received using Quarkus & Smallrye-Reactive messaging extension. What I want to achieve further is to be able to send a message to other topic in case of success and send a message to error topic otherwise. I know that we can use return and @outgoing annotation for emitting new message but I don't think it will fit in my use case. I need a guidance here, if error happens while consuming a message. Should I return message to the original topic (by not acknowledging the message) or should I consume it and produce error message to different topic to rollback the original transaction.

Here is my code :

    @Incoming("new-payment")
    public void newMessage(String msg) {
        LOG.info("New payment has been received.");
        LOG.info("Payload is {}", msg);
        PaymentEvent pe = jsob.fromJson(msg, PaymentEvent.class);

        mysqlPool.preparedQuery("select totalBuyers from Book where isbn = ? ", 
                    Tuple.of(pe.getIsbn()))
            .thenApply(rs -> {
                RowIterator<Row> iterator = rs.iterator();
                if(iterator.hasNext()) {
                    return iterator.next().getInteger(0) + 1;
                } else {
                    return Integer.valueOf(0);
                }
            })
            .thenApply(totalCount -> {
                return mysqlPool.preparedQuery("update Book set totalBuyers = ?", 
                        Tuple.of(totalCount));
            })
            .whenComplete((rs, err) -> {
                if(err != null) {
                    //Emit an error to error topic.
                } else {
                    //Emit a msg to other service.
                }
            });
    }

Also if you've better code please submit, I am still newbie in reactive programming :).

2

2 Answers

1
votes

I've been doing enterprise integration for years and I think that you would want to do both.

Should I return message to the original topic (by not acknowledging the message) or should I consume it and produce error message to different topic to rollback the original transaction.

The event should remain on the topic for another instance to potentially pick up and process. And an error message should be logged as an event. Perhaps the same consumer could pick up and reprocess the event successfully.

An EDA (Event Driven Architecture) may offer different ways to handle this but on an ESB the message would be marked as tried. Generally three tried attempts would send it to a dead-letter queue so that it can be corrected and reprocessed later.

Our enterprise is also starting to design and build applications using EDA so I am interested to read what others have to say on this question. And KUDOS to you for focusing on Quarkus. I believe that this is one of the best technologies to come from Redhat that I have seen yet!

0
votes

Another problem with this approach is that you are doing “2 writes in 1 service” e.g. one call to the db and another one to a topic. And this can become problematic when one of the 2 writes fails.

If you want to avoid this and use a pure event driven approach, then you need to reorder your events in such a way that writing to a db is the last event in the whole flow so that you can prevent 2 writes from 1 service.

Thus in your case: change the 2nd thenApply(..) method from updating the db into firing a new event to another topic. And the consumer of this new topic should do the db update. Thus the flow becomes like this:

Producer -> topic1 -> consumer (select from ...) & fire event to another topic -> topic2 -> consumer (update table).