1
votes

I'm using Spring Boot to send data from one application to the other using Kafka.

My design uses an interface to declare the data being sent:

package domain;

interface Data {
    public String getData();
    public void setData(String data);
}

Producer

In the source app, I implement this interface as a db Entity.

package persistence;

@Data
class DataEntity implements Data {
    private String data;  // lombok generates getter/setters
}

When an entity is added, I want send it as an update to Kafka using the KafkaTemplate

@Component
class DataPublisher implements ApplicationListener<DataEvent> {
    @Autowired private KafkaTemplate<String,Data> template;

    // I left out DataEvent which is a straightforward ApplicationEvent override
    @EventListener(classes = DataEvent.class)
    public void onApplicationEvent(DataEvent event) {
        template.send("data", (Data) event.getSource());
    }
}
// triggered by this call in a service
    eventPublisher.publishEvent(new DataEvent(updatedData));

The serialization is done via the properties

spring:
    kafka:
        consumer:
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            properties.spring.json.value.default.type: domain.Data

Looking at kafkacat output, the data is sent fine.

Consumer

On the receiving side I have

@KafkaListener(topics = "data")
public void dataUpdated(@Payload Data data) {
    dataService.updateData(data);
}

which results in

Caused by: java.lang.IllegalArgumentException: The class 'persistence.DataEntity' is not in the trusted packages [...]

which I understand perfectly fine - the serializer sends a persistence.DataEntity object, but the client expects a domain.Data object. But that's how the design is supposed to be; I want the client to only know about the domain package, not its persistence implementation. (As a side question, where can I see this type header? It's not in the encoded json AFAICT, what am I missing?)

So the question is: how do I force the Spring JsonDeserializer to send domain.Data as the serialized data type?

I did find a TYPE_MAPPING property in the serializer class, but its only documentation is that it "add[s] type mappings to the type mapper: 'foo:com.Foo,bar:com.Bar'" which doesn't explain anything and I can't find an example usage.

EDIT:

I did add

spring.kafka.producer.properties.spring.json.type.mapping=domain.Data:persistence.DataEntity

to the properties of the producer, but that didn't help.

1

1 Answers

4
votes

See the documentation.

You have to provide mapping on both sides.

However, instead of using the JsonDeserializer, you should use a BytesDeserializer and a BytesJsonMessageConverter (simply add one as a @Bean and Boot will wire it into the container factory).

That way, the framework will automatically convert to the parameter type.

Again, see the documentation.