4
votes

I can't find any documentation online using Python to subscribe and consume messages from an Azure Event Hub. I know it's possible in C, C#, and Java. I just need to know if it is possible to use Python.

The Azure python SDK currently seems to only support sending messages, but not opening up an Async connection to constantly receive messages from the Event Hub. http://azure-sdk-for-python.readthedocs.org/en/latest/servicebus.html#event-hub

3
I'd suggest revising your question to focus on how to consume events in Python, or asking about using Web Sockets from Django. Right now it's not entirely clear what you are asking.cacsar
Thank you Cesar, I've revised my question to ask whether it is possible to subscribe or consume data from an Azure Event Hub.Phuc H Duong
Hi @PhucHDuong Did you resolve this using python. Am trying to solve a similar scenario using python and spark streaming. Could you please help me what is the approach you have implemented.ankush reddy

3 Answers

3
votes

The only way I've found to connect to EventHubs from python is to use the python-qpid-proton library/pypi module.

This is because eventhubs use amqp 1.0 + TLS so most of the other libraries you'll find won't work (they implement <= amqp 0.9).

I'm still hoping to find a solution that's easier to use with python on windows, but that should work on OS X and Linux boxes just fine.

2
votes

Without knowing the detailed requirements of your dashboard, we can only give you some high level advice. In a nut shell, you can think your dashboard as a consumer group (or multiple consumer groups if there’re multiple partitions, as each group can only connect to a single partition). You just need to use AMQP to connect to the event hub. In Python you can use an AMPQ library such as https://pypi.python.org/pypi/amqp. Don’t worry, after reading the events in your dashboard, the events will not be deleted, so they will continue to be available to other consumer groups. AMPQ is a standard. So you just need to provide the library the event hub’s address, authentication info, and then you can connect to event hub.

1
votes

It might be too late for your project but azure-eventhub Python SDK is available now providing the functionality to send/receive events to/from the Event Hub service.

I'll post the information here which would help users looking for the SDK at a later time.

azure-eventhub v5 is available on pypi: https://pypi.org/project/azure-eventhub.

There is also a migration guide from v1 to v5 for those who are using v1 sdk to smoothly migrate the program to v5.

To consume messages from Event Hub, please follow the sample code:

#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
An example to show receiving events from an Event Hub.
"""
import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']


def on_event(partition_context, event):
    # Put your code here.
    # If the operation is i/o intensive, multi-thread will have better performance.
    print("Received event from partition: {}.".format(partition_context.partition_id))


def on_partition_initialize(partition_context):
    # Put your code here.
    print("Partition: {} has been initialized.".format(partition_context.partition_id))


def on_partition_close(partition_context, reason):
    # Put your code here.
    print("Partition: {} has been closed, reason for closing: {}.".format(
        partition_context.partition_id,
        reason
    ))


def on_error(partition_context, error):
    # Put your code here. partition_context can be None in the on_error callback.
    if partition_context:
        print("An exception: {} occurred during receiving from Partition: {}.".format(
            partition_context.partition_id,
            error
        ))
    else:
        print("An exception: {} occurred during the load balance process.".format(error))


if __name__ == '__main__':
    consumer_client = EventHubConsumerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        consumer_group='$Default',
        eventhub_name=EVENTHUB_NAME,
    )

    try:
        with consumer_client:
            consumer_client.receive(
                on_event=on_event,
                on_partition_initialize=on_partition_initialize,
                on_partition_close=on_partition_close,
                on_error=on_error,
                starting_position="-1",  # "-1" is from the beginning of the partition.
            )
    except KeyboardInterrupt:
        print('Stopped receiving.')