I am designing a new Data landscape, currently developing my Proof of concept. In here I use the following architecture: Azure functions --> Azure event hub --> Azure Blob storage --> Azure factory --> Azure databricks --> Azure SQL server.
What I am strugging with at the moment is the idea about how to optimize "data retrieval" to feed my ETL process on Azure Databricks.
I am handling transactional factory data that is sumbitted by minute to Azure blob storage via the channels in front of it. Thus I end up with 86000 files each day that need to be handled. Indeed, this is a huge number of separate files to process. Currently I use the following piece of code to build a list of filenames that are currently present on the azure blob storage. Next, I retrieve them by reading each file using a loop.
The problem I am facing is the time this process takes. Of course we are talking here about an enormous amount of small files that need to be read. So I am no expecting this process to complete in a few minutes.
I am aware that upscaling the databricks cluster may solve the problem but I am not certainly sure that only that will solve it, looking at the number of files I need to transfer in this case. I am running to following code by databricks.
# Define function to list content of mounted folder
def get_dir_content(ls_path):
dir_paths = ""
dir_paths = dbutils.fs.ls(ls_path)
subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths
filenames = []
paths = 0
mount_point = "PATH"
paths = get_dir_content(mount_point)
for p in paths:
# print(p)
filenames.append(p)
avroFile = pd.DataFrame(filenames)
avroFileList = avroFile[(avroFile[0].str.contains('.avro')) & (avroFile[0].str.contains('dbfs:/mnt/PATH'))]
avro_result = []
# avro_file = pd.DataFrame()
avro_complete = pd.DataFrame()
for i in avroFileList[0]:
avro_file = spark.read.format("avro").load(i)
avro_result.append(avro_file)
At last, I am doing a union for all these files to create one dataframe of them.
# Schema definiƫren op basis van
avro_df = avro_result[0]
# Union all dataframe
for i in avro_result:
avro_df = avro_df.union(i)
display(avro_df)
I am wondering how to optimize this process. The reason for the output by minute is that we are planning to build a "near realtime insight" later on, once we have the analytical reporting architecture (for which we need a daily process only) in place.