0
votes

I am working on a distributed microservices application that uses Kafka for internal communication. The applications exchange POJOs over topics. When a producer sends a message to consumer, a header is added by default indicating the package name and class name of the object in payload. The consumer application then uses this information to deserialise the payload. But this requires me to define exact same class in the same package in both the applications which does not result in a good design for me. If i set the configuration (JsonSerializer.ADD_TYPE_INFO_HEADERS) on producer side to not send the type in header, it results in an error on consumer side. Also I dont want to use default type on consumer application as it has multiple listeners that expect different types of objects. Why cant the kafkalistener simply deserialise the json payload to the object type given in argument, why does it need the header?

To work around this I defined a consumerFactory with 'BytesDeserialser' and a KafkaListenerContainerFactory with a 'BytesJsonMessageConverter' on the consumer application. With this it worked on the consumer side, but I am not sure how to make this work on the producer side while using a replyingKafkaTemplate and deserialising the reply from consumer.

Below are my configurations - //producer configs

@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  props.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.common.adapter.model.response.AccountResponse");
  return props;
}

@Bean
public ProducerFactory<String, Object> replyProducerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Object> replyTemplate() {
  return new KafkaTemplate<>(replyProducerFactory());
}

//consumer configs
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate() {
    ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate =
    new ReplyingKafkaTemplate<>(requestProducerFactory(), replyListenerContainer());
    replyingKafkaTemplate.setReplyTimeout(10000);
    replyingKafkaTemplate.setMessageConverter(converter());
    return replyingKafkaTemplate;
}

@Bean
public KafkaMessageListenerContainer<String, Object> replyListenerContainer() {
    ContainerProperties containerProperties = new ContainerProperties(replyTopic);
    return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties);
}

@Bean
public ConsumerFactory<String, Object> replyConsumerFactory() {
    JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
    jsonDeserializer.addTrustedPackages("*");
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.trader.account.model.response.AccountResponse");
    return props;
}
1

1 Answers

0
votes

You can use type mapping.

The producer maps com.acme.Foo to foo and the consumer maps foo to com.other.Bar.

The types must be compatible at the JSON level.

If you only receive one type, you can configure the deserializer to use that instead of looking for headers with type information.

https://docs.spring.io/spring-kafka/docs/2.5.2.RELEASE/reference/html/#serdes-json-config

  • JsonDeserializer.KEY_DEFAULT_TYPE: Fallback type for deserialization of keys if no header information is present.

  • JsonDeserializer.VALUE_DEFAULT_TYPE: Fallback type for deserialization of values if no header information is present.

Starting with version 2.5, you can add a function which will be called by the deserializer so you can introspect the data to determine the type.

See Using Methods to Determine Types.

This (and type mapping) are the only way to handle multiple types in the replying template. On the consumer side, we can infer the type based on the method parameter (which is the correct mechanism to use there - it is not a "work around").