0
votes

i am posting below data into kafka and receive through spring integration channels and transform into Log object, how do i transform below data into Log object using spring integration transformer? appreciate any help here

'Log(clientKey=string, payload=string)'

Here is the channel adapter code

@Bean
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
            kafkaListenerContainer());
    kafkaMessageDrivenChannelAdapter.setPayloadType(Log.class);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(inputChannel());
    return kafkaMessageDrivenChannelAdapter;
}

when i try to convert within the service activator using below

 ObjectMapper objectMapper = new ObjectMapper();
 Log msg = objectMapper.readValue(arg0.getPayload().toString() , Log.class);

its failing with

com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Log': was expecting ('true', 'false' or 'null')

1

1 Answers

2
votes

First of all the 'Log(clientKey=string, payload=string)' doesn't look like well-formed JSON as you would like to convert from later in the Spring Integration.

Another concern that your do kafkaMessageDrivenChannelAdapter.setPayloadType(Log.class);. This way you don't need any downstream transformation and MessagingMessageConverter in the IntegrationRecordMessageListener will do the conversion job for us.

However we still need to have a proper data to be able to convert from.

You also need to keep in mind that Apache Kafka has its own mechanisms to deserialize a byte[] from the wire. See Spring for Apache Kafka for more information about conversion and (de)serialization: https://docs.spring.io/spring-kafka/docs/current/reference/html/_reference.html#serdes

For the proper JSON conversion Spring Integration provides a JsonToObjectTransformer component, which can be used with the @Transformer annotation: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-transformation-chapter.html#_common_transformers

UPDATE

@Transformer(inputChannel = "inputChannel", outputChannel = "processChannel")
@Bean
public JsonToObjectTransformer jsonToObjectTransformer() {
   return new JsonToObjectTransformer(Log.class);
}