i'm having an issue with Spark-Streaming and Kafka. While running a sample program to consume from a Kafka topic and output micro-batched results to the terminal, my job seems to hang when i set the option:
df.option("startingOffsets", "earliest")
Starting the job from the latest offset works fine, results are printed to the terminal as each micro batch streams through.
I was thinking maybe this was a resouces issue--i'm trying to read from a topic with quite a bit of data. However i don't seem to have memory/cpu issues (running this job with a local[*] cluster). The job never really seems to start, but just hangs on the line:
19/09/17 15:21:37 INFO Metadata: Cluster ID: JFXVL24JQ3K4CEbE-VA58A
val sc = new SparkConf().setMaster("local[*]").setAppName("spark-test")
val streamContext = new StreamingContext(sc, Seconds(1))
val spark = SparkSession.builder().appName("spark-test")
.getOrCreate()
val topic = "topic.with.alotta.data"
//subscribe tokafka
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
//write
df.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.start()
.awaitTermination()
I'd expect to see results printed to the console....but, the application just seems to hang as I mentioned. Any thoughts? It feels like a spark resource issue (because i'm running a local "cluster" against a topic that has a lot of data. Is there something about the nature of streaming dataframes that i'm missing?
java.lang.OutOfMemoryError: Java heap space- OneCricketeer