1
votes

Just a few simple questions on the actual mechanism behind reading a file on s3 into an EMR cluster with Spark:

Does spark.read.format("com.databricks.spark.csv").load("s3://my/dataset/").where($"state" === "WA") communicate the whole dataset into the EMR cluster's local HDFS and then perform the filter after? Or does it filter records when bringing the dataset into the cluster? Or does it do neither? If this is the case, what's actually happening?

The official documentation lacks an explanation of what's going on (or if it does have an explanation, I cannot find it). Can someone explain, or link to a resource with such an explanation?

3

3 Answers

2
votes
  • Loading data from S3 (s3://-) usually goes via EMRFS in EMR
  • EMRFS directly access S3 (not via HDFS)
  • When Spark loads data from S3, they are stored as DataSet in the cluster according to StorageLevel(memory or disk)
  • Finally, Spark filters loaded data
2
votes

I can't speak for the closed source AWS one, but the ASF s3a: connector does its work in S3AInputStream

Reading data is via HTTPS, which has slow startup time, and if you need to stop the download before the GET is finished, forces you to abort the TCP stream and create a new one.

To keep this cost down the code has features like

  • Lazy seek: when you do a seek(), it updates its internal pointer but doesn't issue a new GET until you actually do a read.

  • chooses whether to abort() vs read to end on a GET based on how much is left

  • Has 3 IO modes:

"sequential", GET content range is from (pos, EOF). Best bandwidth, worst performance on seek. For: CSV, .gz, ...

"random": small GETs, min(block-size, length(read)). Best for columnar data (ORC, Parquet) compressed in a seekable format (snappy)

"adaptive" (new last week, based on some work from microsoft on the Azure WASB connector). Starts off sequential, as soon as you do a backwards seek switches to random IO

Code is all there, improvements welcome. The current perf work (especially random IO) based on TPC-DS benchmarking of ORC data on Hive, BTW)

Assuming you are reading CSV and filtering there, it'll be reading the entire CSV file and filtering. This is horribly inefficient for large files. Best to import into a column format and use predicate pushdown for the layers below to seek round the file for filtering and reading columns

1
votes

When you specify files located on S3 they are read into the cluster. The processing happens on the cluster nodes.

However, this may be changing with S3 Select, which is now in preview.