1
votes

I have two tables. Both the tables are external tables in hive stored in parquet data format.

The first table table_1 has 250 Million rows on daily basis from year 2015. This table is partitioned based on create_date. So for every create_date, there is about 250M rows.

The second table - table_2 is daily delta table and the average row count is about 1.5 Million rows.

There is one common column "lookup_id" in both tables. Now I need to fetch all the columns from table_1 for the delta data from table_2 using data frames.

I thought of doing something like below

table_1=spark.table("table_1")
table_2=spark.table("table_2")
result_df=table_1.join(table_2, table_1.lookup_id=table_2.lookup_id, "inner").drop(table_2.lookup_id)

But I doubt if this is really efficient and if pyspark will be able to handle this without any memory errors.

Question 1: How to parallelize the table_1 scan based on create_date partitions?

Question 2: Is there any other way to optimize table_1 scan based on the lookup_ids from table_2 and/or based on partitions?

Additional info to give more clarity on what I am looking for:

I am trying to understand when we join the tables using dataframes, does spark read the data and keep it in memory and join them or it just joins while reading itself. If the second one is true, what are all the joins the second statement is applicable for. Also if there is any need to use loop to avoid any memory errors.

2
Is there any relation between table1.create_date and table2.create_date ? For example, is this true that if row1.lookup_id == row2.lookup_id then row1.create_date == row2.create_date for row1 ∈ table1 and row2 ∈ table2 ?gudok
No, only lookup_id is available. create_date is not availableMohan

2 Answers

2
votes

Not sure about your driver and executor memory, but in general two possible join optimizations are - broadcasting the small table to all executors and having the same partition key for both the dataframes. In your case repartitioning based on your lookup id will make it faster if table 2 is too huge to be broadcasted. But reparationing has its own cost. You can find more here - https://umbertogriffo.gitbook.io/apache-spark-best-practices-and-tuning/avoiding_shuffle_less_stage-_more_fast#:~:text=One%20way%20to%20avoid%20shuffles,then%20broadcast%20to%20every%20executor.

Let me know what you think. looking forward to a discussion in this topic.

If you cannot broadcast,an example of avoiding join using bucketing - inspired from here : Spark: Prevent shuffle/exchange when joining two identically partitioned dataframes

spark.catalog.setCurrentDatabase(<your databasename>)
test1.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item')
test2.write.mode('overwrite').bucketBy(100,'item').saveAsTable('table_item1')
#test1.

#%%
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # this is just to disable auto broadcasting for testing
import pyspark.sql.functions as F
inputDf1 = spark.sql("select * from table_item")
inputDf2 = spark.sql("select * from table_item1")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"),on='item')

Now try

inputDf3.explain()

The result will be something like this:

== Physical Plan ==
*(3) Project [item#1033, col1#1030, col2#1031, col3#1032, id#1038]
+- *(3) SortMergeJoin [item#1033], [item#1039], Inner
   :- *(1) Sort [item#1033 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [col1#1030, col2#1031, col3#1032, item#1033]
   :     +- *(1) Filter isnotnull(item#1033)
   :        +- *(1) FileScan parquet 
   +- *(2) Sort [item#1039 ASC NULLS FIRST], false, 0
      +- *(2) Project [id#1038, item#1039]
         +- *(2) Filter isnotnull(item#1039)
            +- *(2) FileScan parquet 

As you can see there is no Exchange hashpartitioning happening here. So try bucketing both of your dataframe and try to join.

1
votes

When you will read the CSV .. it will be automatically partitioned and parallel processing will happen .. based on the default configuration (in case we aren't changing any)

A specific answer to this ...If you have a 30GB uncompressed text file stored on HDFS, then with the default HDFS block size setting (128MB) it would be stored in 235 blocks, which means that the RDD you read from this file would have 235 partitions.

Now, There's two things here 1. Flat files like CSV and 2. compressed file like parquet

  1. When you have a text file ...When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop InputFormat used to read this file. For instance, if you use textFile() it would be TextInputFormat in Hadoop, which would return you a single partition for a single block of HDFS (but the split between partitions would be done on line split, not the exact block split), unless you have a compressed text file.

  2. For a parquet or compressed file : In case of compressed file you would get a single partition for a single file (as compressed text files are not splittable).

Now, as you are using parquet this is already well partitioned, while doing optimization , you can check your cluster size and see how much partitions happened etc.

So, answer : Question 1: How to parallelize the table_1 scan based on create_date partitions? This is already partitioned

For , Question 2: Is there any other way to optimize table_1 scan based on the lookup_ids from table_2 and/or based on partitions?

You can try filtering the records which are not necessary , this concept is called Spark predicate push down in Spark SQL queries, so even before loading the data into memory spark will filter out unnecessary columns .. more here

Spark predicate push down to database allows for better optimized Spark queries. A predicate is a condition on a query that returns true or false, typically located in the WHERE clause. A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance. By default the Spark Dataset API will automatically push down valid WHERE clauses to the database.