0
votes

I have a python function with a blob input binding. The blob in question contains a parquet file. Ultimately I want to read the bound blob into a pandas dataframe but I am unsure of the correct way to do this.

I have verified that the binding is correctly set up and I've been able to successfully read a plain text file. I am happy that the integrity of the parquet file is fine as I have been able to read it using the example provided here: https://arrow.apache.org/docs/python/parquet.html#reading-a-parquet-file-from-azure-blob-storage

The following code shows what I am trying to do:


import logging
import io
import azure.functions as func
import pyarrow.parquet as pq


def main(req: func.HttpRequest,  inputblob: func.InputStream) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    # Create a bytestream to hold blob content
    byte_stream = io.BytesIO()
    byte_stream.write(inputblob.read())
    df = pq.read_table(source=byte_stream).to_pandas()

I get the following error message:

pyarrow.lib.ArrowIOError: Couldn't deserialize thrift: TProtocolException: Invalid data

The following is my function.json file:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    },
    {
        "name": "inputblob",
        "type": "blob",
        "path": "<container>/file.parquet",
        "connection": "AzureWebJobsStorage",
        "direction": "in"
    }
  ]
}

My host.json file:

{
    "version":  "2.0",
    "functionTimeout": "00:10:00",
    "extensionBundle": {
        "id": "Microsoft.Azure.Functions.ExtensionBundle",
        "version": "[1.*, 2.0.0)"
    }
}
1

1 Answers

1
votes

I have been working on the same problem, and this solution worked for me.

__ini_.py file:

from io import BytesIO
import azure.functions as func

def main(blobTrigger: func.InputStream):

    # Read the blob as bytes
    blob_bytes = blobTrigger.read()
    blob_to_read = BytesIO(blob_bytes)
    df = pd.read_parquet(blob_to_read, engine='pyarrow')
    print("Length of the parquet file:" + str(len(df.index)))