1
votes

I have a following azure storage queue trigger azure function which is binded to azure table for the output.

[FunctionName("TestFunction")]
public static async Task<IActionResult> Run(
    [QueueTrigger("myqueue", Connection = "connection")]string myQueueItem,
    [Table("TableXyzObject"), StorageAccount("connection")] IAsyncCollector<TableXyzObject> tableXyzObjectRecords)
{
            var tableAbcObject = new TableXyzObject();

            try
            {    
                tableAbcObject.PartitionKey = DateTime.UtcNow.ToString("MMddyyyy");
                tableAbcObject.RowKey = Guid.NewGuid();
                tableAbcObject.RandomString = myQueueItem;

                await tableXyzObjectRecords.AddAsync(tableAbcObject);
            }
            catch (Exception ex)
            {
            }
            
            return new OkObjectResult(tableAbcObject);
        }

        public class TableXyzObject : TableEntity
        {
            public string RandomString { get; set; }
        }
    }
}

I am looking for a way to read 15 messages from poisonqueue which is different than myqueue (queue trigger on above azure function) and batch insert it in to dynamic table (tableXyz, tableAbc etc) based on few conditions in the queue message. Since we have different poison queues, we want to pick up messages from multiple poison queues (name of the poison queue will be provided in the myqueue message). This is done to avoid to spinning up new azure function every time we have a new poison queue.

Following is the approach I have in my mind,
--> I might have to get 15 queue messages using queueClient (create new one) method - ReceiveMessages(15) of Azure.Storage.Queue package
--> And do a batch insert using TableBatchOperation class (cannot use output binding)

Is there any better approch than this?

1
You're using a QueueTrigger, but then want to call ReceiveMessages() inside of the queued trigger function back to the same queue? Azure Functions already has concurrency for storage queue triggers (default is 16 processes). Using ReceiveMessages() doesn't seem to make sense. Can you explain? If you cannot use output binding for whatever reason, then doing the inserts with a TableBatchOp makes sense.Bryan Lewis
sorry for the confusion. Queue (myqueue) trigger here is different than the queue which I want to read messages from.Vicky

1 Answers

2
votes

Unfortunately, storage queues don't have a great solution for this. If you want it to be dynamic then the idea of implementing your own clients and table outputs is probably your best option. The one thing I would suggest changing is using a timer trigger instead of a queue trigger. If you are putting a message on your trigger queue every time you add something to the poison queue it would work as is, but if not a timer trigger ensures that poisoned messages are handled in a timely fashion.

Original Answer (incorrectly relating to Service Bus queues)

Bryan is correct that creating a new queue client inside your function isn't the best way to go about this. Fortunately, the Service Bus extension does allow batching. Unfortunately the docs haven't quite caught up yet.

Just make your trigger receive an array:

[QueueTrigger("myqueue", Connection = "connection")]string myQueueItem[]

You can set your max batch size in the host.json:

"extensions": {
  "serviceBus": {
    "batchOptions": {
      "maxMessageCount":  15
    }
  }
}