5
votes

I am writing a parquet file to hdfs using the following command: df.write.mode(SaveMode.Append).partitionBy(id).parquet(path)

Afterwards I am reading and filtering the file like this:

val file = sqlContext.read.parquet(folder)
val data = file.map(r => Row(r.getInt(4).toString, r.getString(0), r.getInt(1),
    r.getLong(2), r.getString(3)))

val filteredData = data.filter(x => x.thingId.equals("1"))
filteredData.collect()

I would expect, that Spark would leverage the partitioning of the file and only read the partition of "thingId = 1". In Fact, Spark does read all partitions of the file and not only the filtered one (partition with thingId=1). If I look in the logs, I can see that it does read everything:

16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=1/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=42/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=17/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/0833/id=33/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=26/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33 INFO ParquetRelation: Reading Parquet file(s) from hdfs://sandbox.hortonworks.com/path/id=12/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet

Is there anything I am missing? When I look at the docs, Spark should know based on the filter, that it should only read Partition with thingID=1. Does anyone of you have an idea whats the problem?

1

1 Answers

6
votes

A few issues may prevent Spark from successfully "pushing down" predicates (i.e. using filter at input-format level):

  1. filter-pushdown is OFF: depending on Spark's version you are using, the predicate pushdown option (spark.sql.parquet.filterPushdown) might be turned off. It's ON by default as of Spark 1.5.0 - so check your version and your configuration

  2. filter is "opaque": This seems to be the case here: you're loading parquet file, mapping each row to another Row (reordering columns?), and then using the filter method that accepts a function. Spark can't "read" the function code and realize that it uses a comparison on the partitioning column - to Spark, this is just a Row => Boolean function that can do all sorts of checks...

    For filter pushdown to work, you need to use it before mapping the records into something that is "detached" from the original structure, and to use one of the filter overloads that uses a filter that is parseable by Spark, for example:

    // assuming the relevant column name is "id" in the parquet structure
    val filtered = file.filter("id = 1") 
    
    // or:
    val filtered = file.filter(col("id") === 1) 
    
    // and only then:
    val data = filtered.map(r => Row(...))