1
votes

I have set up an Azure Event Hub and I am sending AMQP messages in JSON format from a Python script, and am attempting to stream those messages to Power BI using Stream Analytics. The messages a very simple device activity from and IoT device

The Python snippet is

msg = json.dumps({ "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz }, ensure_ascii=False, encoding='utf8')
message.body = msg
messenger.put(message)
messenger.send()

I have used the example C# message reader in the MS tutorial to read the data back from the event hub with no problem, the output is:

Message received.  Partition: '2', Data: '??{"DeviceUID": "z_70b3d515200002e7_0", "Signal": "/on?1", "DeviceID": "1", "Hub": "91754623489", "Timestamp": "2016-07-15T07:56:50.277440Z"}'

But when I try to test the Stream Analytics input from the Event Hub, I get an error

Diagnostics: Could not deserialize the input event as Json. Some possible reasons: 1) Malformed events 2) Input source configured with incorrect serialization format

I'm not sure what Malformed Events means - I have assumed that Stream Analytics can cope with data sent to an Event Hub via AMQP?

I can't see anything wrong with the JSON as received by the C# app - unless the BOM symbol is causing a problem?

This is my first attempt at all this, and I have searched for any similar posts with no avail, so I'd really appreciate if someone could point me in the right direction.

Cheers Rob

3

3 Answers

3
votes

This is caused by client API incompatibility. Python uses Proton to send the JSON string in the body of an AMQP Value message. The body is encoded as an AMQP string (AMQP type encoding bytes + utf8 encoded bytes of string). Stream Analytics uses Service Bus .Net SDK which exposes AMQP message as EventData and its body is always byte array. For AMQP value message, it includes the AMQP type encoding bytes as without them it is not possible to decoded the following value. These extra bytes at the beginning will cause JSON serialization to fail.

To achieve interoperability on message body, the application should ensure the publisher and consumer agree on its type and encoding. In this case the publisher should send raw bytes in an AMQP Data message. With the Proton Python API, you can try this:

message.body = msg.encode('utf-8')

The other workaround is to send simple types (e.g. string) in application properties.

Other people also ran into this issue. https://github.com/Azure/amqpnetlite/issues/117

1
votes

As @XinChen said, the issue was caused by the AMQP protocol.

Per my experience, the two workaround ways below are effective for this case.

  1. Using Send Event REST API instead of Azure Python SDK with AMQP, but the rest api is based on HTTP protocol which not be high performance.
  2. Sending the JSON message with Base64 encoding, then decoding the received message to a JSON string.
0
votes

These two things worked for me:

  • add message.inferred = True
  • check to make sure your specifying dumps encoding encoding='utf-8' not encoding='utf8' as in your example.

Updated OP:

msg = json.dumps({ "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz }, ensure_ascii=False, encoding='utf-8')
message.body = msg
message.inferred = True
messenger.put(message)
messenger.send()

By adding the inferred flag, I think the message serializer can properly infer that the body is bytes and create AMPQ DATA, thus addressing @Xin Chen's point.

The inferred flag for a message indicates how the message content is encoded into AMQP sections. If inferred is true then binary and list values in the body of the message will be encoded as AMQP DATA and AMQP SEQUENCE sections, respectively. If inferred is false, then all values in the body of the message will be encoded as AMQP VALUE sections regardless of their type.

re: Qpid Proton Docs #inferred

re: JSON Encoder and Decoder #dumps