0
votes

I'm not being able to reproduce documentation or sample code in order to have a non String Key being serialized.

My goal is using the Key (field) to pass control actions alongside data.

Classes ControlChannel and SchedulerEntry are regular Pojo.

Environment is:

  • Java 11
  • Spring Boot 2.4.1
  • Kafka 2.6.0

Expected code to Serialize/Deserialize:

Listener and Template


    @KafkaListener(topics = "Scheduler", groupId = "scheduler", containerFactory = "schedulerKafkaListenerContainerFactory")
    public void listenForScheduler(
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ControlChannel control, 
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
        @Payload SchedulerEntry entry) {

        log.info("received data KEY ='{}'", control);
        log.info("received data PAYLOAD = '{}'", entry);

        /* ... */

    }

    @Bean
    public KafkaTemplate<ControlChannel, SchedulerEntry> schedulerKafkaTemplate() {
        return new KafkaTemplate<>(schedulerProducerFactory());
    }

**First Try - Consumer and Producer (Type Mapping and Trusted Packaged) **


    @Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        props.put(JsonSerializer.TYPE_MAPPINGS, "key:io.infolayer.aida.ControlChannel, value:io.infolayer.aida.entity.SchedulerEntry");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props, 
            new JsonSerializer<ControlChannel>(),
            new JsonSerializer<SchedulerEntry>());
    }
   
    
    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(JsonDeserializer.TYPE_MAPPINGS, "key:io.infolayer.aida.ControlChannel, value:io.infolayer.aida.entity.SchedulerEntry");
    
        JsonDeserializer<ControlChannel> k = new JsonDeserializer<ControlChannel>();
        k.configure(props, true);
    
        JsonDeserializer<SchedulerEntry> v = new JsonDeserializer<SchedulerEntry>();
        k.configure(props, true);
    
        return new DefaultKafkaConsumerFactory<>(props, k, v);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }

Exception:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Scheduler-0 at offset 25. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

**Second Try - Consumer and Producer (Just setting Key serializer/deserializer as Json) **

@Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new JsonDeserializer<>(ControlChannel.class), new JsonDeserializer<>(SchedulerEntry.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }


Exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: 
Listener method 'public void  io.infolayer.aida.scheduler.KafkaSchedulerListener.listenForScheduler(io.infolayer.aida.ControlChannel,long,io.infolayer.aida.entity.SchedulerEntry)' 
threw exception; nested exception is org.springframework.core.convert.ConverterNotFoundException: 
No converter found capable of converting from type [io.infolayer.aida.entity.SchedulerEntry] to type [@org.springframework.messaging.handler.annotation.Header io.infolayer.aida.ControlChannel]; nested exception is org.springframework.core.convert.ConverterNotFoundException: 
No converter found capable of converting from type [io.infolayer.aida.entity.SchedulerEntry] to type [@org.springframework.messaging.handler.annotation.Header io.infolayer.aida.ControlChannel]
1

1 Answers

0
votes

There are several problems with your first attempt.

  • you need to call configure() on the serializers with add type info=true
  • you are calling configure() on k twice and not configuring v (deserializers)

This works as expected...

@SpringBootApplication
public class So65501295Application {

    private static final Logger log = LoggerFactory.getLogger(So65501295Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So65501295Application.class, args);
    }

    @Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);
        props.put(JsonSerializer.TYPE_MAPPINGS,
                "key:com.example.demo.So65501295Application.ControlChannel, "
                        + "value:com.example.demo.So65501295Application.SchedulerEntry");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        JsonSerializer<ControlChannel> k = new JsonSerializer<ControlChannel>();
        k.configure(props, true);
        JsonSerializer<SchedulerEntry> v = new JsonSerializer<SchedulerEntry>();
        v.configure(props, false);
        return new DefaultKafkaProducerFactory<>(props, k, v);
    }

    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(JsonDeserializer.TYPE_MAPPINGS,
                "key:com.example.demo.So65501295Application.ControlChannel, "
                        + "value:com.example.demo.So65501295Application.SchedulerEntry");

        JsonDeserializer<ControlChannel> k = new JsonDeserializer<ControlChannel>();
        k.configure(props, true);

        JsonDeserializer<SchedulerEntry> v = new JsonDeserializer<SchedulerEntry>();
        v.configure(props, false);

        return new DefaultKafkaConsumerFactory<>(props, k, v);
    }

    @KafkaListener(topics = "Scheduler", groupId = "scheduler", containerFactory = "schedulerKafkaListenerContainerFactory")
    public void listenForScheduler(
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ControlChannel control,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            @Payload SchedulerEntry entry) {

        log.info("received data KEY ='{}'", control);
        log.info("received data PAYLOAD = '{}'", entry);

        /* ... */

    }

    @Bean
    public KafkaTemplate<ControlChannel, SchedulerEntry> schedulerKafkaTemplate() {
        return new KafkaTemplate<>(schedulerProducerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<ControlChannel, SchedulerEntry> template) {
        return args -> {
            template.send("Scheduler", new ControlChannel(), new SchedulerEntry());
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("Scheduler").partitions(1).replicas(1).build();
    }

    public static class ControlChannel {

        String foo;

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

    }

    public static class SchedulerEntry {

        String foo;

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

    }

}
2021-01-04 11:42:25.026  INFO 23905 --- [ntainer#0-0-C-1] com.example.demo.So65501295Application   
    : received data KEY ='com.example.demo.So65501295Application$ControlChannel@44a72886'
2021-01-04 11:42:25.026  INFO 23905 --- [ntainer#0-0-C-1] com.example.demo.So65501295Application   
    : received data PAYLOAD = 'com.example.demo.So65501295Application$SchedulerEntry@74461c59'