4
votes

I want to read all messages of an Azure Queue Storage and write them into a Blob. Ideally I would like to read batches of 10000 or more and write them into a Blob.

I am using Azure Cloud Functions with Queue Storage Binding for input and Blob Storage Binding for output, but I can't seen to find an API or a configuration option that would enable me to read more than 1 message. Does anyone know about such an API?

2

2 Answers

5
votes

The official documentation doesn't mention any support for processing Storage Queue messages in batches in a single execution of Azure Function. There is an open issue in WebJobs SDK. So, it's not supported.

If you are flexible which service to use for messaging middleware, you could switch to Event Hubs. Event Hub trigger supports (and encourages) processing messages in batches. It won't probably be 10.000 though: the batch size is limited to 256k of data.

To process Storage Queue messages in batches you'd have to get away from Queue Triggered Functions (e.g. run a function on a timer and connect to the table storage to process all the messages, or have a custom polling Web Job, or use Web Job SDK with a custom trigger).

1
votes

I finally found a solution I am perfectly happy with. Using buffers was not scalable because the runtime can easily exceed 5 minutes limit imposed by the Azure Functions runtime, plus the obvious memory consumption issue, plus I had to use a timer trigger so I would need to somehow make sure all relevant messages are in the queue at a certain time.

What I did now is to using a normal queue binding to acquire a message, and the Node Storage SDK, to implement some sort of "fake" streaming into an Append Blob. So every message is converted one by one into a CSV line, and appended to the respective blob.

Here is my code for that function:

const config = require('./config/config.js')
const storage = require('azure-storage')
const csvTransformer = require('./lib/csvTransform')
const async = require('async')

module.exports = function (context, myQueueItem) {
  context.log(
    'JavaScript queue trigger function processed work item',
    myQueueItem
  )

  let blobService = storage.createBlobService(config.targetBlobConnection)
  let messageDayString = csvTransformer.determineDayFromMessage(myQueueItem)
  let blobName = messageDayString + '.csv'
  let csvMessage
  async.waterfall(
    [
      function (callback) {
        blobService.createContainerIfNotExists(
          config.targetBlobContainer,
          { publicAccessLevel: 'blob' },
          err => {
            callback(err)
          }
        )
      },
      function (callback) {
        blobService.doesBlobExist(
          config.targetBlobContainer,
          blobName,
          null,
          (err, blobResult) => {
            context.log('got blobResult: ', blobResult)
            callback(err, blobResult)
          }
        )
      },
      function (blobResult, callback) {
        if (blobResult && blobResult.exists) {
          csvMessage = csvTransformer.transformMessageToCSV(myQueueItem, false)
          blobService.appendFromText(
            config.targetBlobContainer,
            blobName,
            csvMessage,
            null,
            (err, appendedBlobResult) => {
              context.log('appended to existing blob: ', appendedBlobResult)
              callback(err, appendedBlobResult)
            }
          )
        } else {
          csvMessage = csvTransformer.transformMessageToCSV(myQueueItem, true)
          blobService.createAppendBlobFromText(
            config.targetBlobContainer,
            blobName,
            csvMessage,
            null,
            (err, createdBlobResult) => {
              context.log('created new blob: ', createdBlobResult)
              callback(err, blobResult)
            }
          )
        }
      }
    ],
    function (err, result) {
      if (err) {
        context.log.error('Error happened!')
        context.log.error(err)
        context.done(err)
      } else {
        context.log('appended CSV message to blob')
        context.bindings.outputQueueItem = csvMessage
        context.done()
      }
    }
  )
}