0
votes

I am trying to read from 2 different Kafka topics using 2 different KafkaListeners and getting an error whenever the second KafkaListener is invoked. I get a org.springframework.messaging.converter.MessageConversionException error. It seems that the second listener is read a message that is specific to the first listener and topic.

I have a Kafka topics that i am listening to. The message contents are then written to a websocket channel where i have a SockJS client that has subscribed to the channel. This works perfectly. I then created a new topic and then added a second KafkaListener. However when the secong listener is invoked, i see that it is attempting to process/read a payload that corresponds to the first KafkaListener and topic and since it's not configured to do so, a MessageConversionException error is thrown.

Models
------
@JsonPropertyOrder({ "ackdate", "ack_count" })
public class DailyTransfer {
    private String ackdate;
    private Long ack_count;

    public DailyTransfer() {}

    public DailyTransfer(String ackdate, Long ack_count) {
        this.ackdate = ackdate;
        this.ack_count = ack_count;
    }

    ... Getters and Setters omitted for brevity

    @Override
    public String toString() {
        return "DailyTransfer{" +
                "ackdate='" + ackdate + '\'' +
                ", ack_count=" + ack_count +
                '}';
    }
}

@JsonPropertyOrder({ "rgdno", "bizname", "tin", "incordate", "commencedate", "biz_pk", "ack_at", "ack_at_ms", "ack_message" })
public class BizAck {

    private String rgdno;
    private String ack_message;
    private String bizname;
    private String tin;
    private String incordate;
    private String commencedate;
    private Long biz_pk;
    private String ack_at;
    private Long ack_at_ms;

    public BizAck() {}

    public BizAck(String rgdno, String ack_message, String bizname, String tin, String incordate, String commencedate, Long biz_pk, String ack_at,
                    Long ack_at_ms) {
        this.rgdno = rgdno;
        this.ack_message = ack_message;
        this.bizname = bizname;
        this.tin = tin;
        this.incordate = incordate;
        this.commencedate = commencedate;
        this.biz_pk = biz_pk;
        this.ack_at = ack_at;
        this.ack_at_ms = ack_at_ms;
    }

    ... Getters and Setters omitted for brevity

    @Override
    public String toString() {
        return "BizAck{" +
                "rgdno='" + rgdno + '\'' +
                ", ack_message='" + ack_message + '\'' +
                ", bizname='" + bizname + '\'' +
                ", tin='" + tin + '\'' +
                ", incordate='" + incordate + '\'' +
                ", commencedate='" + commencedate + '\'' +
                ", biz_pk=" + biz_pk +
                ", ack_at='" + ack_at + '\'' +
                ", ack_at_ms=" + ack_at_ms +
                '}';
    }
}

Configuration
-------------
@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> cprops = new HashMap<>();
        cprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
        cprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        cprops.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
        cprops.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        cprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return cprops;
    }

@Bean
    public ConsumerFactory<String, BizAck> bizAckConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(BizAck.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, BizAck> bizAckKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, BizAck> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(bizAckConsumerFactory());
        return factory;
    }

@Bean
    public ConsumerFactory<String , DailyTransfer> consumerFactoryDailyTransfer(){
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
        config.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("daily.transfer.consumer.group-id"));
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                new JsonDeserializer<>(DailyTransfer.class));
    }

@Bean(name="kafkaListenerContainerFactoryDailyTransfer")
    public ConcurrentKafkaListenerContainerFactory<String, DailyTransfer> kafkaListenerContainerFactoryDailyTransfer() {
        ConcurrentKafkaListenerContainerFactory<String, DailyTransfer> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryDailyTransfer());
        return factory;
    }

Listeners
---------
// listener to consume BizAck messages
    @KafkaListener( topics = "${spring.kafka.json.topic}", containerFactory = "bizAckKafkaListenerContainerFactory",
            groupId="${spring.kafka.consumer.group-id}")
    public void ssnitAckListener(BizAck bizAck) {
        logger.info("Received message='{}' from Kafka Topic", bizAck.toString());
        this.simpMessagingTemplate.convertAndSend("/bizTransfers/pushNotification", bizAck);
    }

// listener to consume DailyTransfer messages
@KafkaListener( topics="${spring.kafka.json.topic2}", containerFactory="kafkaListenerContainerFactoryDailyTransfer",
            groupId="${daily.transfer.consumer.group-id}" )
    public void dailyTransferListener(DailyTransfer dailyTransfer) {
        logger.info("Received message='{}' from transfer summary count Kafka Topic", dailyTransfer.toString());
        this.simpMessagingTemplate.convertAndSend("/summaryCounts/pushNotification", dailyTransfer);
    }

The first listener, listener to consume BizAck messages, works perfectly. See below from logs

INFO 9708 --- [ntainer#1-0-C-1] g.g.g.s.n.kafka.BizAckTopicListener : Received message='BizAck{rgdno='CS006192018', ack_message='Business registration received for : CS006192018', bizname='DASEL ENGINEERING COMPANY LIMITED', tin='C0010143181', incordate='09-JAN-2018', commencedate='09-JAN-2018', biz_pk=3667, ack_at='2019-04-23T08:51:02.684Z', ack_at_ms=1556009462684}' from Kafka Topic

but the second listener, listener to consume DailyTransfer messages, throws an error.

ERROR 9708 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = DAILY_TRANSFER_COUNTS, partition = 3, offset = 173, CreateTime = 1556009462652, serialized key size = 10, serialized value size = 51, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2019-04-23, value = BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null})

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void gh.biztransfers.notification.kafka.DailyTransferTopicListener.dailyTransferListener(gh.biztransfers.notification.model.DailyTransfer)] Bean [gh.biztransfers.notification.kafka.DailyTransferTopicListener@14bd03ea]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [gh.biztransfers.notification.model.BizAck] to [gh.biztransfers.notification.model.DailyTransfer] for GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}], failedMessage=GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [gh.biztransfers.notification.model.BizAck] to [gh.biztransfers.notification.model.DailyTransfer] for GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}], failedMessage=GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1311) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1300) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1227) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1198) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1118) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_192] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [gh.biztransfers.notification.model.BizAck] to [gh.biztransfers.notification.model.DailyTransfer] for GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}], failedMessage=GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:292) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1256) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] ... 8 common frames omitted Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [gh.biztransfers.notification.model.BizAck] to [gh.biztransfers.notification.model.DailyTransfer] for GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE] at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaHandlerMethodFactoryAdapter$1.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:840) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] ... 13 common frames omitted

Why is the second listener picking BizAck messages and attempting to convert/process them?

**Sending Configuration Excerpt

@Autowired
    private KafkaTemplate<String, BizAck> bizAckKafkaTemplate;
    public void sendAcknowledgementMessage(String rcvdMessage) {
        BizAck bizAck = utils.JsonStr2BizAck(rcvdMessage);
        logger.info("Sending acknowledgement message to Kafka for : \n"+ "Biz Regn: "+ bizAck.getRgdno() +", TIN : " + bizAck.getTin()+", Name: " + bizAck.getBizname());
        // the KafkaTemplate provides asynchronous send methods returning a Future
        ListenableFuture<SendResult<String, BizAck>> future = bizAckKafkaTemplate.send(Objects.requireNonNull(env.getProperty("spring.kafka.json.topic")), bizAck);
        // register a callback with the listener to receive the result of the send asynchronously
        future.addCallback(new ListenableFutureCallback<SendResult<String, BizAck>>() {
            @Override
            public void onSuccess(SendResult<String, BizAck> result) {
                logger.info("Successfully sent message=[ " + rcvdMessage + " ] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                logger.info("Unable to send message=[ " + rcvdMessage + " ] due to : " + ex.getMessage());
            }
        });
    }

Sending Logs Excerpt

2019-04-23 11:06:02.999 INFO 9708 --- [enerContainer-1] g.g.g.s.n.kafka.AcknowledgementSender : Sending received acknowledgement message to Kafka for : Biz Regn: CG094562018, TIN : C0910331870, Name: COMMUNITIES FOR DEVELOPMENT 2019-04-23 11:06:02.999 INFO 9708 --- [ad | producer-1] g.g.g.s.n.kafka.AcknowledgementSender : Successfully sent message=[ {"rgdno":"CG094562018","bizname":"COMMUNITIES FOR DEVELOPMENT","tin":"C0910331870","incordate":"16-JAN-2018","commencedate":"16-JAN-2018","biz_pk":3800,"ack_at":"2019-04-23T11:06:02.858Z","ack_at_ms":1556017562858,"ack_message":"Biz regn received for : CG002642018"} ] with offset=[3556] I send these messages successfully to a topic other than DAILY_TRANSFER_COUNTS. The DAILY_TRANSFER_COUNTS topic is derived from a query executed in KSQL on this topic.**

1
Just verify using kafka tool what type of messages each topic has? - Deadpool

1 Answers

0
votes

Well clearly, the second topic has a BizAck...

ConsumerRecord(topic = DAILY_TRANSFER_COUNTS, ... value = BizAck{...

So the problem seems to be on the sending side.