3
votes

I have implemented Spark Streaming using createDirectStream. My Kafka producer is sending several messages every second to a topic with two partitions.

On Spark streaming side, i read kafka messages every second and them I'm windowing them on 5 second window size and frequency.

Kafka message are properly processed, i'm seeing the right computations and prints.

But in Spark Web UI, under Streaming section, it is showing number of events per window as Zero. Please see this image:

Spark UI showing Zero events

I'm puzzled why is it showing Zero, shouldn't it show number of Kafka messages being feed into Spark Stream?

Updated:

This issue seems to be happening when i use groupByKeyAndWindow() api. When i commented out this api usage from my code, Spark Streaming UI started reporting Kafka event input size correctly.

Any idea why is this so? Could this a defect in Spark Streaming?

I'm using Cloudera CDH: 5.5.1, Spark: 1.5.0, Kafka: KAFKA-0.8.2.0-1.kafka1.4.0.p0.56

enter image description here

1
Are you restoring job from checkpoint directory? It seems there is an issue registered here issues.apache.org/jira/browse/SPARK-11152avr
Are you sure you're looking at the right streaming job?Yuval Itzchakov
@avr, to verify that there is no checkpoint in my code, i commented out usage of updateStateByKey api and ssc.checkpoint(checkpointDir). But I still dont see right "event size". Spark Streaming UI is reporting first event size properly and there on it always says "zero".Sudheer Palyam
Have you been able to find a solution to this problem? Having similar issues regardless of direct or receiver based kafka input stream.akizl
I'm having a similar issue with a custom receiver, except that all my input sizes are 1. I'm using a window function with the slideDuration and windowDuration.Saif Charaniya

1 Answers

0
votes

It seems that it is not recorded by the Spark Kafka library code.

Based on the code of Spark 2.3.1

  1. Search Input Size / Records, found it is the value of stageData.inputBytes (StagePage.scala)
  2. Search StageData and inputBytes, found it is the value of metrics.inputMetrics.bytesRead (LiveEntity.scala)
  3. Search bytesRead, found it's set in HadoopRDD.scala, FileScanRDD.scala and ShuffleSuite.scala. But not in any Kafka related files.