2
votes

I'm running emr-5.2.0 and have one year of data stored in S3 as Parquet that is partitioned by day. When querying for one month I'm expecting Spark to only load one month of data into memory. However, my cluster memory usage looks like I'm loading the full years 1.7TB of data.

Spark Memory Usage

I'm assuming that I can load the full data lake like this

val lakeDF = spark.sqlContext.read.parquet("s3://mybucket/mylake.parquet")
lakeDF.cache()
lakeDF.registerTempTable("sightings")

And that Spark would use the dates in the query to only select the partitions that match there WHERE filter.

val leftDF = spark.sql("SELECT * FROM sightings WHERE DATE(day) BETWEEN "2016-01-09" AND "2016-01-10"")
val audienceDF = leftDF.join(ghDF, Seq("gh9"))
audienceDF.select( approxCountDistinct("device_id", red = 0.01).as("distinct"), sum("requests").as("avails") ).show()

I am curious if casting the partition as DATE is causing this issue?

I've also been running some test with Athena/PrestoDB on the same dataset and it's very clear that only a few gigabytes of data is being scanned.

Is there any way for Spark to tell me how much data is going to be loaded before submitting a query?

1
have you tried to remove the lakeDF.cache() statement? You cold also study the physical plan ex given by df.explain() at the end of your transformations (before calling an action), maybe this gives you a hint - Raphael Roth
Yes lakeDF.cache() was the issue. - jspooner

1 Answers

1
votes

The issue was caused by calling lakeDF.cache() before the filter was applied.