I am working on a distributed microservices application that uses Kafka for internal communication. The applications exchange POJOs over topics. When a producer sends a message to consumer, a header is added by default indicating the package name and class name of the object in payload. The consumer application then uses this information to deserialise the payload. But this requires me to define exact same class in the same package in both the applications which does not result in a good design for me. If i set the configuration (JsonSerializer.ADD_TYPE_INFO_HEADERS) on producer side to not send the type in header, it results in an error on consumer side. Also I dont want to use default type on consumer application as it has multiple listeners that expect different types of objects. Why cant the kafkalistener simply deserialise the json payload to the object type given in argument, why does it need the header?
To work around this I defined a consumerFactory with 'BytesDeserialser' and a KafkaListenerContainerFactory with a 'BytesJsonMessageConverter' on the consumer application. With this it worked on the consumer side, but I am not sure how to make this work on the producer side while using a replyingKafkaTemplate and deserialising the reply from consumer.
Below are my configurations - //producer configs
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.common.adapter.model.response.AccountResponse");
return props;
}
@Bean
public ProducerFactory<String, Object> replyProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> replyTemplate() {
return new KafkaTemplate<>(replyProducerFactory());
}
//consumer configs
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate() {
ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate =
new ReplyingKafkaTemplate<>(requestProducerFactory(), replyListenerContainer());
replyingKafkaTemplate.setReplyTimeout(10000);
replyingKafkaTemplate.setMessageConverter(converter());
return replyingKafkaTemplate;
}
@Bean
public KafkaMessageListenerContainer<String, Object> replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties);
}
@Bean
public ConsumerFactory<String, Object> replyConsumerFactory() {
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.trader.account.model.response.AccountResponse");
return props;
}