Update:
For production usage, I suggest you should use the stable version of event hub sdk. You can use eph, sample code is here.
I can use the pre-release eventhub 5.0.0b6 to use consumer group as well as set checkpoint.
But the strange thing is that, in blob storage, I can see 2 folders created for the eventhub: checkpoint and ownership folder. Inside the folders, there're blob created for the partitions, but blob is empty. More stranger thing is that, even the blob is empty, every time I read from eventhub, it always read the latest data(means that it never reads the data has been read already in the same consumer group).
You need to install azure-eventhub 5.0.0b6 and use pip install --pre azure-eventhub-checkpointstoreblob
to install azure-eventhub-checkpointstoreblob. For blob storage, you should install the latest version 12.1.0 of azure-storage-blob.
I follow this sample. In this sample, it uses event hub level connection string(NOT event hub namespace level connection string). You need to create an event hub level connection string by nav to azure portal -> your eventhub namespace -> your event hub instance -> Shared access policies -> click "Add" -> then specify a policy name, and select permission. If you just want to receive data, you can only select the Listen permission. The screenshot as below:
After the policy created, you can copy the connection string as per screenshot below:
Then you can follow this code below:
import os
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
CONNECTION_STR = 'Endpoint=sb://ivanehubns.servicebus.windows.net/;SharedAccessKeyName=saspolicy;SharedAccessKey=xxx;EntityPath=myeventhub'
STORAGE_CONNECTION_STR = 'DefaultEndpointsProtocol=https;AccountName=xx;AccountKey=xxx;EndpointSuffix=core.windows.net'
def on_event(partition_context, event):
# do something with event
print(event)
print('on event')
partition_context.update_checkpoint(event)
if __name__ == '__main__':
#the "a22" is the blob container name
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, "a22")
#the "$default" is the consumer group
client = EventHubConsumerClient.from_connection_string(
CONNECTION_STR, "$default", checkpoint_store=checkpoint_store)
try:
print('ok')
client.receive(on_event)
except KeyboardInterrupt:
client.close()
The test result: