0
votes

I am in need of processing several thousands small log files.

I opted for Databricks to handle this problem, because it has good parallel computing capacities and interacts nicely with the Azure Blob storage account where the files are hosted.

After some researching, I always retrieve the same snippet of code (in PySpark).

# Getting your list of files with custom function
list_of_files = get_my_files()

# Create a path_rdd and use a custom udf to parse it
path_rdd = sc.parallelize(list_of_files)
content = path_rdd.map(parse_udf).collect()

Is there a better any method to do this? Would you opt for a flatmap if the logfiles are in a CSV format?

Thank you!

1
If all your files are in same directory (or you can move them in same directory), you can read at the folder level (spark.read.format('csv').load("folder_name")) - this way you will leverage spark internal parallel processing instead parsing every file as a UDF. - Hussain Bohra
They are in a hierarchical directory structure "{location}/{YYYY}{MM}". Do I get any performance gain if I change my list_of_files to a list_of_lowest_dirs in that case? - bramb
If you can get directory structure updated to "location/year=YYYY/month=MM/date=DD/*.csv" - then while reading folder like df = spark.read.format("csv").option("header", "true").load("cars_data/") will automatically add year, month and date as a column, which you can utilize for filter and that will certainly provide you performance gain. - Hussain Bohra
@HussainBohra You don’t have to use Hive partitioning. You could use the wildcard: spark.read.csv("location/*/*/"). - Oliver W.
@OliverW. thanks for the tip! Can you also do custom comment header parsing with read.csv? - bramb

1 Answers

0
votes

My current solution is:

content = sc.wholeTextFiles('/mnt/container/foo/*/*/', numPartitions=XX)
parsed_content = content.flatMap(custom_parser).collect()

I read all the content of the files as a string and keep their filenames. I then pass this to my parsing function "custom_parser" using a flatMap, "where custom_parser" is defined as

def custom_parser(*argv):
    file, content = argv
    # Apply magic
    return parsed_content_

I am currently finishing with a .collect() action, but I will alter this to save the output directly.