I followed tutorial from http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/ and I am able to insert data from avro console to cassandra. Now I am trying to extend this to use flume and I have flume set up in my machine which will pick the log file and push it to kafka, trying to insert my data to cassandra database. In a text file I am putting data
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2}
{"id": 2, "created": "2016-05-06 13:54:00", "product": "OP-DAX-C-20150201-100", "price": 99.5}
{"id": 3, "created": "2016-05-06 13:55:00", "product": "FU-DATAMOUNTAINEER-20150201-100", "price": 10000}
{"id": 4, "created": "2016-05-06 13:56:00", "product": "FU-KOSPI-C-20150201-100", "price": 150}
Flume is picking that data and pushing it to kafka.
In cassandra sink, I am facing an error,
ERROR Task cassandra-sink-orders-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! [2016-09-28 15:47:00,951] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143) [2016-09-28 15:47:00,951] INFO Stopping Cassandra sink. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:79) [2016-09-28 15:47:00,952] INFO Shutting down Cassandra driver session and cluster. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:165)
Schema that I am using
./confluent/bin/kafka-avro-console-producer \--broker-list localhost:9092 \--topic orders-topic \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'
Config for flume: Flume-kafka.conf.properties
agent.sources = spoolDirSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink
agent.sources.spoolDirSrc.type = spooldir
agent.sources.spoolDirSrc.spoolDir = eventlogs
agent.sources.spoolDirSrc.inputCharset = UTF-8
agent.sources.spoolDirSrc.deserializer.maxLineLength = 1048576
agent.sources.spoolDirSrc.channels = memoryChannel
agent.sinks.kafkaSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.topic = orders-topic
agent.sinks.kafkaSink.brokerList = localhost:9092
agent.sinks.kafkaSink.channel = memoryChannel
agent.sinks.kafkaSink.batchSize = 20
Can anyone please help me, how to fix this error?