9
votes

I'm facing memory issues running structured stream with aggregation and partitioning in Spark 2.2.0:

session
    .readStream()
    .schema(inputSchema)
    .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
    .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
    .csv("s3://test-bucket/input")
    .as(Encoders.bean(TestRecord.class))
    .flatMap(mf, Encoders.bean(TestRecord.class))
    .dropDuplicates("testId", "testName")
    .withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
    .writeStream()
    .option("path", "s3://test-bucket/output")
    .option("checkpointLocation", "s3://test-bucket/checkpoint")
    .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
    .partitionBy("year")
    .format("parquet")
    .outputMode(OutputMode.Append())
    .queryName("test-stream")
    .start();

During testing I noticed that amount of used memory increases each time when new data comes and finally executors exit with code 137:

ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal

I've created a heap dump and found that most of the memory used by org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider that is referenced from StateStore

On the first glance it looks normal since that is how Spark keeps aggregation keys in memory. However I did my testing by renaming files in source folder, so that they could be picked up by spark. Since input records are the same all further rows should be rejected as duplicates and memory consumption shouldn't increase but it is.

executor memory usage

Moreover, GC time took more than 30% of total processing time

enter image description here

Here is a heap dump taken from the executor running with smaller amount of memory than on screens above since when I was creating a dump from that one the java process just terminated in the middle of the process.

enter image description here

2

2 Answers

4
votes

Migrating my comment on SPARK-23682 which asker of this question also filed in issue.

In HDFS state store provider, it excessively caches the multiple versions of states in memory, default 100 versions. The issue is addressed by SPARK-24717, and it will only maintain two versions (current for replay, and new for update) of state in memory. The patch will be available in Spark 2.4.0.

1
votes

I think the root reason is that you do not use a watermark along with dropDuplicates, thus all the states are kept and never dropped. Have a look at: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication