1
votes

I am trying to integrate flume with spark streaming application. I am running the spark Scala sample FlumePollingEventCount to pull events from flume. I am running the spark job on single machine.

I have following configuration.

Avro Source -> Memory Channel -> Spark SInk

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    a1.sources.r1.type = avro
    a1.sources.r1.bind = 192.168.1.36
    a1.sources.r1.port = 41414

    a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
    a1.sinks.k1.hostname = 192.168.1.36
    a1.sinks.k1.port = 41415

    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

I am sending events at 41414 using avro client , but spark streaming is not able to receive any event.

I get following error when I start the spark example

WARN FlumeBatchFetcher: Did not receive events from Flume agent due to error on the Flume agent: begin() called when transaction is OPEN!

At flume console I get following exception; 2016-01-07 19:56:51,344 (Spark Sink Processor Thread - 10) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Spark was unable to successfully process the events. Transaction is being rolled back. 2016-01-07 19:56:51,344 (New I/O worker #5) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Received an error batch - no events were received from channel! 2016-01-07 19:56:51,353 (New I/O worker #5) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Received an error batch - no events were received from channel! 2016-01-07 19:56:51,355 (Spark Sink Processor Thread - 9) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:80)] Error while processing transaction. java.lang.IllegalStateException: begin() called when transaction is OPEN! at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113) at scala.Option.foreach(Option.scala:236) at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)

Can some one give me a clue?

3

3 Answers

2
votes

I ran into the same problem when I used spark-flume-approach2 but included different version of spark-streaming-flume_${spark.scala.version} in flume classpath. If you include the exact versions as specified in above link, you should not see this error again.

2
votes

I met same problem, and it was really caused by different jar version. The issue has been resolved after I replace scala-library-2.11.7.jar with scala-library-2.11.8.jar. But the initial message 'begin() called when transaction is OPEN!' should be more meaningful. Kudos to Sandeep and user2710368

0
votes

In my case, which is very samilar to SandeepKumar, in my lib dir, I have 2 versions of scala-Library, deleting the old, remaining the required one solved my case, which cost me 5 hours.