3
votes

Introduction

I have noticed that none of the push filters in our project work. It explains why execution time is suffering since it reads millions of reads when it should have reduced it to a few thousands. To debug the problem, I have written a small test that read a CSV file, filter the content (PushDown Filter) and return the result.

It hasn't work with CSV so I have tried to read a parquet file. None of them works.

Data

The people.csv file has the following structure:

first_name,last_name,city  // header
FirstName1,LastName1,Bern // 1st row
FirstName2,LastName2,Sion // 2nd row
FirstName3,LastName3,Bulle // 3rd row

N.B: parquet file has the same structure

Read CSV file

To reproduce the problem, I have written a minimal code that reads a csv file and should return only the filtered data.

Read the csv file and print physical plan:

Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
ds.where(col("city").equalTo("Bern")).show();
ds.explain(true);

Physical plan:

+----------+---------+----+
|first_name|last_name|city|
+----------+---------+----+
|FirstName1|LastName1|Bern|
+----------+---------+----+

== Parsed Logical Plan == Relation[first_name#10,last_name#11,city#12] csv

== Analyzed Logical Plan == first_name: string, last_name: string, city: string Relation[first_name#10,last_name#11,city#12] csv

== Optimized Logical Plan == Relation[first_name#10,last_name#11,city#12] csv

== Physical Plan == *(1) FileScan csv [first_name#10,last_name#11,city#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:people.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct

I have tested with parquet file and the result is unfortunately the same.

What we can notice is:

  • PushedFilters is empty, I would have expected the filter to contain the predicate.
  • The returned result it nevertheless correct.

My question is: why this PushedFilters is empty?

N.B:

  • Spark-Version: 2.4.3
  • File system: ext4 (and HDFS on cluster, hasn't worked neither)
2
Are you sure that it is not working with parquet files? What version of spark are you using? Where are the files stored? (hdfs? s3? ...)Oli
Hi Oli! I have tried with both (CSV and parquet file), none of them are working. For the spark-version, I have added at the end of the questionKeyMaker00

2 Answers

6
votes

You are calling explain on the first dataset, the one with just the read. Try with something like (sorry, I had just the Scala environment available):

val ds: DataFrame = spark.read.option("header", "true").csv("input.csv")
val f = ds.filter(col("city").equalTo("Bern"))

f.explain(true)

f.show()

Also, be careful when using the typed dataset API because of this. Shouldn't be your case though.

1
votes

Just for the sake of documentation, here is the solution (thanks LizardKing):

Result

  • before: PushedFilters: []
  • after: PushedFilters: [IsNotNull(city), EqualTo(city,Bern)]

Code

Dataset<Row> ds = sparkSession.read().option("header", "true").csv(BASE_PATH+"people.csv");
Dataset<Row> dsFiltered = ds.where(col("city").equalTo("Bern"));
dsFiltered.explain(true);

Physical Plan

The Physical Plan looks much better:

== Parsed Logical Plan ==
'Filter ('city = Bern)
+- Relation[first_name#10,last_name#11,city#12] csv

== Analyzed Logical Plan ==
first_name: string, last_name: string, city: string
Filter (city#12 = Bern)
+- Relation[first_name#10,last_name#11,city#12] csv

== Optimized Logical Plan ==
Filter (isnotnull(city#12) && (city#12 = Bern))
+- Relation[first_name#10,last_name#11,city#12] csv

== Physical Plan ==
*(1) Project [first_name#10, last_name#11, city#12]
+- *(1) Filter (isnotnull(city#12) && (city#12 = Bern))
   +- *(1) FileScan csv [first_name#10,last_name#11,city#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:./people.csv], PartitionFilters: [], PushedFilters: [IsNotNull(city), EqualTo(city,Bern)], ReadSchema: struct<first_name:string,last_name:string,city:string>