I was trying integrate Apache camel with Kafka and wrote a sample program to read a file and write to Kafka Topic. But I am getting below error while doing so. I could be able to do it the reverse way read from Kafka topic and write to a file.
Stacktrace
org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.apache.camel.component.file.GenericFile to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer [#0 - file://C:%5Cshare%5Cinput] KafkaProducer WARN No message key or partition key set [#0 - file://C:%5Cshare%5Cinput] GenericFileOnCompletion WARN Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@7127845b for file: GenericFile[C:\share\input\file.txt] [#0 - file://C:%5Cshare%5Cinput] DefaultErrorHandler ERROR Failed delivery for (MessageId: ID-L8-CWBL462-49953-1480494317350-0-21 on ExchangeId: ID-L8-CWBL462-49953-1480494317350-0-22). Exhausted after delivery attempt: 1 caught: org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.apache.camel.component.file.GenericFile to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Code
@ContextName("myCdiCamelContext") public class MyRoutes extends RouteBuilder {
@Inject
@Uri("file:C:\\share\\input?fileName=file.txt&noop=true")
private Endpoint inputEndpoint;
@Inject
@Uri("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
private Endpoint resultEndpoint;
@Override
public void configure() throws Exception {
from(inputEndpoint)
.to(resultEndpoint);
}
}