I am trying to read from 2 different Kafka topics using 2 different KafkaListeners and getting an error whenever the second KafkaListener is invoked. I get a org.springframework.messaging.converter.MessageConversionException error. It seems that the second listener is read a message that is specific to the first listener and topic.
I have a Kafka topics that i am listening to. The message contents are then written to a websocket channel where i have a SockJS client that has subscribed to the channel. This works perfectly. I then created a new topic and then added a second KafkaListener. However when the secong listener is invoked, i see that it is attempting to process/read a payload that corresponds to the first KafkaListener and topic and since it's not configured to do so, a MessageConversionException error is thrown.
Models
------
@JsonPropertyOrder({ "ackdate", "ack_count" })
public class DailyTransfer {
private String ackdate;
private Long ack_count;
public DailyTransfer() {}
public DailyTransfer(String ackdate, Long ack_count) {
this.ackdate = ackdate;
this.ack_count = ack_count;
}
... Getters and Setters omitted for brevity
@Override
public String toString() {
return "DailyTransfer{" +
"ackdate='" + ackdate + '\'' +
", ack_count=" + ack_count +
'}';
}
}
@JsonPropertyOrder({ "rgdno", "bizname", "tin", "incordate", "commencedate", "biz_pk", "ack_at", "ack_at_ms", "ack_message" })
public class BizAck {
private String rgdno;
private String ack_message;
private String bizname;
private String tin;
private String incordate;
private String commencedate;
private Long biz_pk;
private String ack_at;
private Long ack_at_ms;
public BizAck() {}
public BizAck(String rgdno, String ack_message, String bizname, String tin, String incordate, String commencedate, Long biz_pk, String ack_at,
Long ack_at_ms) {
this.rgdno = rgdno;
this.ack_message = ack_message;
this.bizname = bizname;
this.tin = tin;
this.incordate = incordate;
this.commencedate = commencedate;
this.biz_pk = biz_pk;
this.ack_at = ack_at;
this.ack_at_ms = ack_at_ms;
}
... Getters and Setters omitted for brevity
@Override
public String toString() {
return "BizAck{" +
"rgdno='" + rgdno + '\'' +
", ack_message='" + ack_message + '\'' +
", bizname='" + bizname + '\'' +
", tin='" + tin + '\'' +
", incordate='" + incordate + '\'' +
", commencedate='" + commencedate + '\'' +
", biz_pk=" + biz_pk +
", ack_at='" + ack_at + '\'' +
", ack_at_ms=" + ack_at_ms +
'}';
}
}
Configuration
-------------
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> cprops = new HashMap<>();
cprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
cprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
cprops.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
cprops.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
cprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return cprops;
}
@Bean
public ConsumerFactory<String, BizAck> bizAckConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(BizAck.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, BizAck> bizAckKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BizAck> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(bizAckConsumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String , DailyTransfer> consumerFactoryDailyTransfer(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
config.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("daily.transfer.consumer.group-id"));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(DailyTransfer.class));
}
@Bean(name="kafkaListenerContainerFactoryDailyTransfer")
public ConcurrentKafkaListenerContainerFactory<String, DailyTransfer> kafkaListenerContainerFactoryDailyTransfer() {
ConcurrentKafkaListenerContainerFactory<String, DailyTransfer> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryDailyTransfer());
return factory;
}
Listeners
---------
// listener to consume BizAck messages
@KafkaListener( topics = "${spring.kafka.json.topic}", containerFactory = "bizAckKafkaListenerContainerFactory",
groupId="${spring.kafka.consumer.group-id}")
public void ssnitAckListener(BizAck bizAck) {
logger.info("Received message='{}' from Kafka Topic", bizAck.toString());
this.simpMessagingTemplate.convertAndSend("/bizTransfers/pushNotification", bizAck);
}
// listener to consume DailyTransfer messages
@KafkaListener( topics="${spring.kafka.json.topic2}", containerFactory="kafkaListenerContainerFactoryDailyTransfer",
groupId="${daily.transfer.consumer.group-id}" )
public void dailyTransferListener(DailyTransfer dailyTransfer) {
logger.info("Received message='{}' from transfer summary count Kafka Topic", dailyTransfer.toString());
this.simpMessagingTemplate.convertAndSend("/summaryCounts/pushNotification", dailyTransfer);
}
The first listener, listener to consume BizAck messages, works perfectly. See below from logs
INFO 9708 --- [ntainer#1-0-C-1] g.g.g.s.n.kafka.BizAckTopicListener : Received message='BizAck{rgdno='CS006192018', ack_message='Business registration received for : CS006192018', bizname='DASEL ENGINEERING COMPANY LIMITED', tin='C0010143181', incordate='09-JAN-2018', commencedate='09-JAN-2018', biz_pk=3667, ack_at='2019-04-23T08:51:02.684Z', ack_at_ms=1556009462684}' from Kafka Topic
but the second listener, listener to consume DailyTransfer messages, throws an error.
ERROR 9708 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = DAILY_TRANSFER_COUNTS, partition = 3, offset = 173, CreateTime = 1556009462652, serialized key size = 10, serialized value size = 51, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2019-04-23, value = BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null})
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void gh.biztransfers.notification.kafka.DailyTransferTopicListener.dailyTransferListener(gh.biztransfers.notification.model.DailyTransfer)] Bean [gh.biztransfers.notification.kafka.DailyTransferTopicListener@14bd03ea]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [gh.biztransfers.notification.model.BizAck] to [gh.biztransfers.notification.model.DailyTransfer] for GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}], failedMessage=GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [gh.biztransfers.notification.model.BizAck] to [gh.biztransfers.notification.model.DailyTransfer] for GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}], failedMessage=GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1311) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1300) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1227) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1198) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1118) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_192] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [gh.biztransfers.notification.model.BizAck] to [gh.biztransfers.notification.model.DailyTransfer] for GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}], failedMessage=GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:292) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1256) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] ... 8 common frames omitted Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [gh.biztransfers.notification.model.BizAck] to [gh.biztransfers.notification.model.DailyTransfer] for GenericMessage [payload=BizAck{rgdno='null', ack_message='null', bizname='null', tin='null', incordate='null', commencedate='null', biz_pk=null, ack_at='null', ack_at_ms=null}, headers={kafka_offset=173, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@db7f48, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=2019-04-23, kafka_receivedPartitionId=3, kafka_receivedTopic=DAILY_TRANSFER_COUNTS, kafka_receivedTimestamp=1556009462652}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE] at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaHandlerMethodFactoryAdapter$1.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:840) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.6.RELEASE.jar:5.1.6.RELEASE] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.5.RELEASE.jar:2.2.5.RELEASE] ... 13 common frames omitted
Why is the second listener picking BizAck messages and attempting to convert/process them?
**Sending Configuration Excerpt
@Autowired
private KafkaTemplate<String, BizAck> bizAckKafkaTemplate;
public void sendAcknowledgementMessage(String rcvdMessage) {
BizAck bizAck = utils.JsonStr2BizAck(rcvdMessage);
logger.info("Sending acknowledgement message to Kafka for : \n"+ "Biz Regn: "+ bizAck.getRgdno() +", TIN : " + bizAck.getTin()+", Name: " + bizAck.getBizname());
// the KafkaTemplate provides asynchronous send methods returning a Future
ListenableFuture<SendResult<String, BizAck>> future = bizAckKafkaTemplate.send(Objects.requireNonNull(env.getProperty("spring.kafka.json.topic")), bizAck);
// register a callback with the listener to receive the result of the send asynchronously
future.addCallback(new ListenableFutureCallback<SendResult<String, BizAck>>() {
@Override
public void onSuccess(SendResult<String, BizAck> result) {
logger.info("Successfully sent message=[ " + rcvdMessage + " ] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
logger.info("Unable to send message=[ " + rcvdMessage + " ] due to : " + ex.getMessage());
}
});
}
Sending Logs Excerpt
2019-04-23 11:06:02.999 INFO 9708 --- [enerContainer-1] g.g.g.s.n.kafka.AcknowledgementSender : Sending received acknowledgement message to Kafka for : Biz Regn: CG094562018, TIN : C0910331870, Name: COMMUNITIES FOR DEVELOPMENT 2019-04-23 11:06:02.999 INFO 9708 --- [ad | producer-1] g.g.g.s.n.kafka.AcknowledgementSender : Successfully sent message=[ {"rgdno":"CG094562018","bizname":"COMMUNITIES FOR DEVELOPMENT","tin":"C0910331870","incordate":"16-JAN-2018","commencedate":"16-JAN-2018","biz_pk":3800,"ack_at":"2019-04-23T11:06:02.858Z","ack_at_ms":1556017562858,"ack_message":"Biz regn received for : CG002642018"} ] with offset=[3556] I send these messages successfully to a topic other than DAILY_TRANSFER_COUNTS. The DAILY_TRANSFER_COUNTS topic is derived from a query executed in KSQL on this topic.**