1
votes

I am designing a new Data landscape, currently developing my Proof of concept. In here I use the following architecture: Azure functions --> Azure event hub --> Azure Blob storage --> Azure factory --> Azure databricks --> Azure SQL server.

What I am strugging with at the moment is the idea about how to optimize "data retrieval" to feed my ETL process on Azure Databricks.

I am handling transactional factory data that is sumbitted by minute to Azure blob storage via the channels in front of it. Thus I end up with 86000 files each day that need to be handled. Indeed, this is a huge number of separate files to process. Currently I use the following piece of code to build a list of filenames that are currently present on the azure blob storage. Next, I retrieve them by reading each file using a loop.

The problem I am facing is the time this process takes. Of course we are talking here about an enormous amount of small files that need to be read. So I am no expecting this process to complete in a few minutes.

I am aware that upscaling the databricks cluster may solve the problem but I am not certainly sure that only that will solve it, looking at the number of files I need to transfer in this case. I am running to following code by databricks.

# Define function to list content of mounted folder
def get_dir_content(ls_path):
  dir_paths = ""
  dir_paths = dbutils.fs.ls(ls_path)
  subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
  flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
  return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths
filenames = []
paths = 0

mount_point = "PATH"

paths = get_dir_content(mount_point)
for p in paths:
#   print(p)
  filenames.append(p)

avroFile = pd.DataFrame(filenames)
avroFileList = avroFile[(avroFile[0].str.contains('.avro')) & (avroFile[0].str.contains('dbfs:/mnt/PATH'))]
avro_result = []
# avro_file = pd.DataFrame()
avro_complete = pd.DataFrame()
for i in avroFileList[0]:
  avro_file = spark.read.format("avro").load(i)
  avro_result.append(avro_file)

At last, I am doing a union for all these files to create one dataframe of them.

# Schema definiƫren op basis van 
avro_df = avro_result[0]

# Union all dataframe
for i in avro_result:
  avro_df = avro_df.union(i)

display(avro_df)

I am wondering how to optimize this process. The reason for the output by minute is that we are planning to build a "near realtime insight" later on, once we have the analytical reporting architecture (for which we need a daily process only) in place.

2

2 Answers

1
votes

Instead of listing files, and then reading them separately, I'll recommend to take look onto the Azure Databricks Autoloader instead. It may use notifications for finding what new files were uploaded to the blob storage instead of listing files.

it also would work with multiple files in at point of time, instead of reading them one by one & doing the union.

If you don't need continuous processing of data, then you can use .trigger(once=True) to emulate the batch load of the data.

1
votes

There are multiple ways to do that, but here's what I would do:

Use Azure Functions to trigger your python code whenever a new blob is created in your azure storage account. This will remove the poll part of your code and will send data to databricks as soon as a file is available on your storage account

For the near real time reporting, you can use Azure Stream Analytics and run queries on Event Hub and output to Power Bi, for example.