0
votes

I was trying integrate Apache camel with Kafka and wrote a sample program to read a file and write to Kafka Topic. But I am getting below error while doing so. I could be able to do it the reverse way read from Kafka topic and write to a file.

Stacktrace

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.apache.camel.component.file.GenericFile to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer [#0 - file://C:%5Cshare%5Cinput] KafkaProducer WARN No message key or partition key set [#0 - file://C:%5Cshare%5Cinput] GenericFileOnCompletion WARN Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@7127845b for file: GenericFile[C:\share\input\file.txt] [#0 - file://C:%5Cshare%5Cinput] DefaultErrorHandler ERROR Failed delivery for (MessageId: ID-L8-CWBL462-49953-1480494317350-0-21 on ExchangeId: ID-L8-CWBL462-49953-1480494317350-0-22). Exhausted after delivery attempt: 1 caught: org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.apache.camel.component.file.GenericFile to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

Code

@ContextName("myCdiCamelContext") public class MyRoutes extends RouteBuilder {

 @Inject
 @Uri("file:C:\\share\\input?fileName=file.txt&noop=true")
 private Endpoint inputEndpoint;

 @Inject
 @Uri("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
 private Endpoint resultEndpoint;



@Override
public void configure() throws Exception {
    from(inputEndpoint)
         .to(resultEndpoint);
}

}

1

1 Answers

1
votes

After adding a new processor it worked for me

public void configure() throws Exception {
        from(inputEndpoint).process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                exchange.getIn().setBody(exchange.getIn().getBody(),String.class);
                exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
                exchange.getIn().setHeader(KafkaConstants.KEY, "1");
            }
        })
             .to(resultEndpoint);
    }