I am trying to integrate flume with spark streaming application. I am running the spark Scala sample FlumePollingEventCount to pull events from flume. I am running the spark job on single machine.
I have following configuration.
Avro Source -> Memory Channel -> Spark SInk
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.1.36
a1.sources.r1.port = 41414
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.1.36
a1.sinks.k1.port = 41415
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
I am sending events at 41414 using avro client , but spark streaming is not able to receive any event.
I get following error when I start the spark example
WARN FlumeBatchFetcher: Did not receive events from Flume agent due to error on the Flume agent: begin() called when transaction is OPEN!
At flume console I get following exception; 2016-01-07 19:56:51,344 (Spark Sink Processor Thread - 10) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Spark was unable to successfully process the events. Transaction is being rolled back. 2016-01-07 19:56:51,344 (New I/O worker #5) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Received an error batch - no events were received from channel! 2016-01-07 19:56:51,353 (New I/O worker #5) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Received an error batch - no events were received from channel! 2016-01-07 19:56:51,355 (Spark Sink Processor Thread - 9) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:80)] Error while processing transaction. java.lang.IllegalStateException: begin() called when transaction is OPEN! at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113) at scala.Option.foreach(Option.scala:236) at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
Can some one give me a clue?