can you expand on this answer or is there documentation to see how to
linked the service bus topic to azure function then dump into data
lake?
I'm not sure what language you use, here is implemented in python.
You can use a service bus trigger to listen the messages come in of the service bus queue or topic. Then you can use data lake SDK to save the message:
Use azure function service bus trigger to listen the message:
import logging
import azure.functions as func
def main(msg: func.ServiceBusMessage):
#put the logic of process the message here
logging.info('Python ServiceBus queue trigger processed message: %s',
msg.get_body().decode('utf-8'))
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "msg",
"type": "serviceBusTrigger",
"direction": "in",
"queueName": "queuename",
"connection": "bowman1012_SERVICEBUS"
}
]
}
And use code like below to append messages to data lake:
from azure.storage.filedatalake import DataLakeServiceClient
connect_str = "DefaultEndpointsProtocol=https;AccountName=0730bowmanwindow;AccountKey=xxxxxx;EndpointSuffix=core.windows.net"
datalake_service_client = DataLakeServiceClient.from_connection_string(connect_str)
myfilesystem = "test"
myfolder = "test"
myfile = "FileName.txt"
file_system_client = datalake_service_client.get_file_system_client(myfilesystem)
directory_client = file_system_client.create_directory(myfolder)
directory_client = file_system_client.get_directory_client(myfolder)
print("11111")
try:
file_client = directory_client.get_file_client(myfile)
file_client.get_file_properties().size
data = "Test2"
print("length of data is "+str(len(data)))
print("This is a test123")
filesize_previous = file_client.get_file_properties().size
print("length of currentfile is "+str(filesize_previous))
file_client.append_data(data, offset=filesize_previous, length=len(data))
file_client.flush_data(filesize_previous+len(data))
except:
file_client = directory_client.create_file(myfile)
data = "Test2"
print("length of data is "+str(len(data)))
print("This is a test")
filesize_previous = 0
print("length of currentfile is "+str(filesize_previous))
file_client.append_data(data, offset=filesize_previous, length=len(data))
file_client.flush_data(filesize_previous+len(data))
If you need to develop azure function on local, you need 'azure function core tools','language environment','VS Code and azure function extension'.
For more information, please have a look of this:
https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local?tabs=windows%2Ccsharp%2Cbash
https://docs.microsoft.com/en-us/azure/azure-functions/functions-reference-python
https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=python
And this is the API reference of data lake SDK(In this web page you can find all the methods of interacting with various services based on python and azure):
https://docs.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.datalakeserviceclient?view=azure-python