0
votes

Trying to update a spring-boot service to use spring-boot 2.0.5 and spring-kafka 2.1.10 (from spring-boot 1.5.x and spring-kafka 1.3.x), however I am getting an error when trying to publish a message using the JsonSerializer.

My producer config with (JsonSerializer.ADD_TYPE_INFO_HEADERS, false):

@Configuration
public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public ProducerFactory<String, KafkaMessage> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return configProps;
}

@Bean
public KafkaTemplate<String, KafkaMessage> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
}

Kafka Version (edit, using kafka version 1.0.0):

[2018-10-08 19:20:53,562] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-10-08 19:20:53,562] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)

[2018-10-08 18:24:40,252] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser) [2018-10-08 18:24:40,252] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)

Application error:

java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request

Error in kafka log:

java.lang.IllegalArgumentException: Magic v1 does not support record headers
2

2 Answers

2
votes

Magic v1 does not support record headers is thrown when there is a compatibility issue between kafka broker version and the client version mismatch.

spring-kafka 2.1.10 does not support kafka version 2.0.0. You need to use spring-kafka 2.2.x to be work with kafka 2.0.0.

For Kafka client compatibility, look this official doc from spring.

0
votes

The source of my problem was with the configuration of our kafka implementation. The following properties were configured with the wrong/older version kafka:

  • inter.broker.protocol.version
  • log.message.format.version

After removing these settings, I was successful at producing and consuming message from spring boot 2 applications.