1
votes

I would like to recursively find the maximum date-time value of each subfolder and finally find the top parent's maximum timestamp. spark SQL is slower when I run. So I would like to implement this logic using UDF or data frame methods in pyspark.

+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
|         /A|  parent-Folder|               1|     null|
|       /A/B|     sub-folder|               2|     null|
| /A/B/1.txt|           file|               3| 02022021|
| /A/B/2.txt|           file|               4| 02032021|
|       /A/C|     sub-folder|               2|     null|
| /A/C/3.txt|           file|               3| 02042021|
| /A/C/4.txt|           file|               3| 02052021|
+-----------+---------------+----------------+---------+

The output should display like timestamp value ( Null values replaced with a maximum timestamp of each level)

Output

+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
|         /A|  parent-Folder| 02052021|
|       /A/B|     sub-folder| 02032021|
| /A/B/1.txt|           file| 02022021|
| /A/B/2.txt|           file| 02032021|
|       /A/C|     sub-folder| 02052021|
| /A/C/3.txt|           file| 02042021|
| /A/C/4.txt|           file| 02052021|
+-----------+---------------+---------+

SQL I tried below SQL which is giving expected result. ( it's too slow when millions of records in the data frame) and I tried with data frame cache but it did not help. probably LIKE is a costly operation. ( FYI. I removed the time from the date format as it was not showing correctly. The time format is not a concern here) but subfolders and folder should

df.show()
+-----------+---------------+----------------+---------+
|File_Folder|Folder_File_Ind|folder_level_ind|Timestamp|
+-----------+---------------+----------------+---------+
|         /A|  parent-Folder|               1|     null|
|       /A/B|     sub-folder|               2|     null|
| /A/B/1.txt|           file|               3| 02022021|
| /A/B/2.txt|           file|               4| 02032021|
|       /A/C|     sub-folder|               2|     null|
| /A/C/3.txt|           file|               3| 02042021|
| /A/C/4.txt|           file|               3| 02052021|
+-----------+---------------+----------------+---------+

>>> self_join_rec =  sqlc.sql("SELECT   \
...     a.File_Folder,  a.Folder_File_Ind, Max(b.Timestamp) Timestamp  \
...     FROM src_table a \
...     JOIN src_table b on b.File_Folder LIKE Concat(a.File_Folder, '%')   \
...     GROUP BY \
...     a.File_Folder,  a.Folder_File_Ind \
...     ORDER BY a.File_Folder,a.Folder_File_Ind"
... )
>>> self_join_rec.show()
+-----------+---------------+---------+
|File_Folder|Folder_File_Ind|Timestamp|
+-----------+---------------+---------+
|         /A|  parent-Folder| 02052021|
|       /A/B|     sub-folder| 02032021|
| /A/B/1.txt|           file| 02022021|
| /A/B/2.txt|           file| 02032021|
|       /A/C|     sub-folder| 02052021|
| /A/C/3.txt|           file| 02042021|
| /A/C/4.txt|           file| 02052021|
+-----------+---------------+---------+
1
what is the SQL you have tried (slow as you mentioned). to handle this recursively, you can try join with a condition using rlike or startswith.jxc
I updated the sql that I tried using LIKE. it works but too slow when it handles millions of records.GeekSQL
you can find the max(Timestamp) on each folder (using groupby) and then do self-join on this reduced dataframe, and then join the result back to the original dataframe. this will significantly reduce rows to join than directly using the original df. (broadcast join is likely available when using this reduced dataframe)jxc

1 Answers

0
votes
  1. add a column base_folder that contains only the folder part without the file, that will be used for joinning
  2. group by base_folder and calculate max timestamp
  3. join with original dataframe using base_folder and get max timestamp for rows where it's null
from pyspark.sql import functions as F

# create new column base_folder
df = df.withColumn(
    "base_folder",
    F.when(
        F.col("Folder_File_Ind") == "file",
        F.regexp_extract("File_Folder", "(.*)/.*", 1)
    ).otherwise(F.col("File_Folder"))
)

df.cache()

# calculate max timestamp per group of folders
max_df = df.groupby("base_folder").agg(F.max("Timestamp").alias("max_timestamp")).filter("max_timestamp is not null")

# join df with max_df
df1 = df.alias("df").join(
    max_df.alias("max"),
    F.col("max.base_folder").startswith(F.col("df.base_folder")),
    "left"
).groupby("File_Folder").agg(
    F.first("Folder_File_Ind").alias("Folder_File_Ind"),
    F.first("folder_level_ind").alias("folder_level_ind"),
    F.coalesce(
        F.first("Timestamp"),
        F.max("max_timestamp")
    ).alias("Timestamp")
)

df1.show()

#+-----------+---------------+----------------+-----------------+
#|File_Folder|Folder_File_Ind|folder_level_ind|        Timestamp|
#+-----------+---------------+----------------+-----------------+
#|         /A| parent-Folder |               1| 02-FEB-2021 9 PM|
#| /A/C/4.txt|           file|               3|02-FEB-2021 11 AM|
#| /A/B/2.txt|           file|               3| 02-FEB-2021 9 PM|
#|       /A/C|    sub-folder |               2|02-FEB-2021 11 AM|
#| /A/C/3.txt|           file|               3|02-FEB-2021 11 AM|
#|       /A/B|     sub-folder|               2| 02-FEB-2021 9 PM|
#| /A/B/1.txt|           file|               3| 02-FEB-2021 9 PM|
#+-----------+---------------+----------------+-----------------+

Or using SQL query with CTEs:

sql_query = """
WITH folders_data AS (
    SELECT  *,
            CASE WHEN Folder_File_Ind = 'file' THEN regexp_extract(File_Folder, '(.*)/.*', 1)
                 ELSE File_Folder
            END AS base_folder
    FROM    src_table
), 
max_per_folder AS (
    SELECT  base_folder,
            MAX(Timestamp) AS max_timestamp
    FROM    folders_data
    GROUP BY  base_folder
    HAVING  MAX(Timestamp) IS NOT NULL
)

SELECT  File_Folder,
        FIRST(Folder_File_Ind)      AS Folder_File_Ind,
        FIRST(folder_level_ind)     AS folder_level_ind,
        COALESCE(FIRST(Timestamp), MAX(max_timestamp))  AS Timestamp
FROM    folders_data AS t1
LEFT JOIN max_per_folder t2
ON      t2.base_folder LIKE CONCAT(t1.base_folder, '%')
GROUP BY File_Folder
"""

spark.sql(sql_query).show()