0
votes

I would like to use the Electron golang wrapper for the Qpid proton-c library to connect to the Azure EventHub.

I am setting the following SASL details combined to the host/port/namespace/path required to build the connection string but for some reason I keep getting the error message: connection reset by peer.

package main

import (
    "fmt"
    "os"
    "strings"
    "qpid.apache.org/amqp"
    "qpid.apache.org/electron"
)

var (
    eventHubNamespaceName = "<MY_CUSTOM_NAMESPACE>"
    eventHubName = "<MY_CUSTOM_NAME>"
    eventHubSasKeyName = "<MY_CUSTOM_SAS_KEY_NAME>"
    eventHubSasKey = "<MY_CUSTOM_SAS_KEY>" // this is the base64 encoded stuff
)

func main() {

    sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
    container := electron.NewContainer(fmt.Sprintf("send[%v]", os.Getpid()))

    urlStr := fmt.Sprintf("amqp://%s.servicebus.windows.net:5671/%s", eventHubNamespaceName, eventHubName)
    fmt.Printf("The URL connection string: '%v'\n", urlStr)

    // parse URL
    url, err := amqp.ParseURL(urlStr)
    if err != nil {
        panic(err)
    }
    fmt.Printf("The AMQP parsed URL: %v\n", url)

    // TCP dial
    amqpHost := url.Host
    fmt.Printf("The AMQP host used in the connection is: '%v'\n", amqpHost)
    c, err := container.Dial(
        "tcp", amqpHost, 
        electron.SASLEnable(), 
        electron.Password([]byte(eventHubSasKey)), 
        electron.User(eventHubSasKeyName),
    )
    if err != nil {
        panic(err)
    }
    defer c.Close(nil)

    // AMQP send
    addr := strings.TrimPrefix(url.Path, "/")
    s, err := c.Sender(electron.Target(addr))
    if err != nil {
        panic(err)
    }
    m := amqp.NewMessage()
    body := fmt.Sprintf("bla bla bla %v", 42)
    m.Marshal(body)
    fmt.Printf("The AMQP message body: '%v'\n", m.Body())

    go s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan

    // AMQP ACK receive
    fmt.Printf("Waiting for ACKs...\n")
    for {
        fmt.Printf("Waiting for an ACK coming out of the channel...\n")
        out := <-sentChan // Outcome of async sends.
        fmt.Printf("Received something: '%v'\n", out)
    }   
}

When compiling, then running the code, this is the output:

The URL connection string: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP parsed URL: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP host used in the connection is: '<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671'
The AMQP message body: 'bla bla bla 42'
Waiting for ACKs...
Waiting for an ACK coming out of the channel...
Received something: '{unsent : read tcp <MY_PRIVATE_IP_IN_LAN>:<SOME_PORT>-><THE_NSLOOKUP_IP_OF_THE_AZURE_EVENTHUB>:5671: read: connection reset by peer bla bla bla 42}'
Waiting for an ACK coming out of the channel...

To me that received message saying connection reset by peer does not look like a valid ACK and I am not sure what is wrong with the connection attempt?

  • The compiled version of proton-c is 0.18.0, I am using go1.7.4 linux/amd64.
  • If I add electron.SASLAllowedMechs("EXTERNAL") to the connection options then I get the same error message.
  • If I change the port to 5672, then I get a connection refused panic error after the attempt at dialing via TCP.
  • If I decode the base64 password field with base64.StdEncoding.DecodeString(eventHubSasKey) and pass the bytes to the connection options I keep getting the same error connection reset by peer.
  • If I add this connection option electron.SASLAllowedMechs("ANONYMOUS"), then I still get the same error message connection reset by peer. The reason for doing this is that I am not using any SSL certificate, and the Java wrapper to AMQP that Microsoft provides seems to use this "anonymous" thing instead of the certificate (in fact no certificate is needed to connect to the EventHub using the Java connector).

I am not sure how to proceed here as I am stuck in the connection part and I believe the SASL details are passed in the correct way according to the docs here: https://godoc.org/qpid.apache.org/electron#ConnectionOption

I am still not sure the reason of the failure is not due to SSL certificates, if that's the case I am struggling to see how to include them in the process.

Edit:

I later found out I had to establish a TLS connection over TCP even if I am not providing any private/public pair of keys, also specifying a "virtual host" (otherwise AMQP was complaining about not recognising the host):

    // TLS connection details
    tlsConfig := &tls.Config{}
    eventHubDomainPort := fmt.Sprintf("%s.servicebus.windows.net:5671", eventHubNamespaceName)
    tlsConn, err := tls.Dial("tcp", eventHubDomainPort, tlsConfig)
    if err != nil {
        panic(err)
    }

    // AMPQ container connection on top of TLS via TCP
    eventHubDomain := fmt.Sprintf("%s.servicebus.windows.net", eventHubNamespaceName)
    amqpConn, err := container.Connection(
        tlsConn, 
        electron.SASLEnable(),
        electron.User(eventHubSasKeyName), 
        electron.Password([]byte(eventHubSasKey)),
        electron.VirtualHost(eventHubDomain),
        // electron.SASLAllowedMechs(<SOME_MECHANISM>),
    )
    if err != nil {
        panic(err)
    }
    defer amqpConn.Close(nil)

    // AMQP sender (a AMQP link with target the name defined on the Azure portal)
    s, err := amqpConn.Sender(electron.Target(eventHubName))
    if err != nil {
        panic(err)
    }

However when running the app with the environment variable PN_TRACE_FRM=true (which is giving me some verbose logging at the proton-c level) now the error is:

[handle=0, closed=true, error=@error(29) [condition=:"amqp:unauthorized-access", description="Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'sb://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>'. TrackingId:<SOME_UUID-ISH_HERE>, SystemTracker:<A_LABEL_HERE>, Timestamp:10/25/2017 4:02:58 PM"]]

This afaik means the SASL details (username/password) must be of type "sender" because I am trying to send something to the Event Hub. I double checked those details on the Azure portal (click on "Shared access policies" > then using the policy with "claim" specified as "Send") and they are correct. So I am not sure why I am getting this error.

I actually tried these SASL policies defined on the Azure portal at different levels, both <MY_CUSTOM_NAMESPACE> and <MY_CUSTOM_NAME>, but always the same error message.

I also tried including various SASL mechanisms e.g. when using electron.SASLAllowedMechs("PLAIN") then I get this error: no mechanism available: No worthy mechs found (Authentication failed [mech=none]).

3
The unauthorized-access error means the client is not doing SASL PLAIN or the SAS key info sent in sasl-init was not in the expected format. What is the mechanism in the sasl-init trace?Xin Chen

3 Answers

1
votes

Use "amqps" scheme in urlStr with port 5671. Event Hubs does not allow plain tcp connection. You will also need to enable SASL PLAIN to send a SAS key (user name=key name, password=key) configured on the namespace or event hub entity (looks like you are already doing it). I am not sure about golang but with the Python binding one can put everything in a Uri like this "amqps://sas-key-name:[email protected]:5671". The port number is optional.

The underlying proton-c engine may not use SASL PLAIN if it sees a different supported SASL mechanism. To enforce PLAIN, you can set the allowed mechanism on the container. In go the SASLAllowedMechs function seems to give you a connection option you can provide when creating the connection.

This is the Python code that works well with the Event Hubs.

0
votes

I managed to establish a connection using the "Claims-based authorization" (CBS) on top of AMQP. It seems something specific to Microsoft. Some details can be found at the bottom of this page: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-protocol-guide

Basically this is the list of steps:

  • TLS connection with electron.VirtualHost(eventHubDomain) and the ANONYMOUS SASL mechanism electron.SASLAllowedMechs("ANONYMOUS") (no need to specify SASL username and password). Check the details in the Edit part of my question above here ^.
  • AMQP link for the special $cbs Event Hub name: cbsLink, err := amqpConnection.Sender(electron.Target("$cbs"))
  • Prepare an AMQP message with the Microsoft requirements for the CBS handshake:

The message properties (check this C# code to compare https://github.com/Azure/amqpnetlite/blob/master/Examples/ServiceBus/Scenarios/CbsAsyncExample.cs):

appProps := make(map[string]interface{})
appProps["operation"] = "put-token"
appProps["type"] = "servicebus.windows.net:sastoken"
appProps["name"] = "amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>"

The SAS token formatted in the way Microsoft wants, I've adapted this piece of code: https://github.com/michaelbironneau/asbclient/blob/master/azure.go this way:

aqClient := newClient(Queue, "<MY_CUSTOM_NAMESPACE>", "<MY_CUSTOM_SAS_KEY_NAME>", "<MY_CUSTOM_SAS_KEY>")
sasToken := aqClient.authHeader("amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>", aqClient.signatureExpiry(time.Now()))

That piece of code ^ is based on the python SDK here: https://github.com/Azure/azure-sdk-for-python/blob/master/azure-servicebus/azure/servicebus/servicebusservice.py containing lots of things like upper/lower case URL encodings, mixed with timestamps for expiration purposes and the SASL username and password.

Build the AMQP message importing "qpid.apache.org/amqp":

cbsHandshakeMsg := amqp.NewMessage()
cbsHandshakeMsg.SetApplicationProperties(appProps)
cbsHandshakeMsg.Marshal(sasToken)
  • Send this AMQP message with outcome := cbsLink.SendSync(cbsHandshakeMsg) and then magically you should be authenticated to the Event Hub for a while now.
  • Setup the AMQP link to the Event Hub name you wanted to connect to in the first place: msgSender, err := amqpConnection.Sender(electron.Target("<MY_CUSTOM_NAME>"))

Now you can send the message you want to send using this last AMQP link this way:

m := amqp.NewMessage()
m.Marshal("my message: bla bla bla, foo bar baz!")
outcome := msgSender.SendSync(m)

Done :)

Running this code with the environment variable PN_TRACE_FRM=true helps a lot in troubleshooting AMQP because the proton-c library logs lots of useful debug messages.

For some reason the AMQP PLAIN mechanism passing the SASL username and password directly during the connection attempt does not work with the Event Hub. It may be an issue with them or with the Electron/Qpid libraries, I am not sure, but now at least someone is able to send messages using golang and that CBS Microsoft protocol they made available.

0
votes

TLS is required as described by azure AMQP protocol guide.

After setting up the connection and TLS, Service Bus offers two SASL mechanism options:

  1. SASL PLAIN is commonly used for passing username and password credentials to a server. Service Bus does not have accounts, but named Shared Access Security rules, which confer rights and are associated with a key. The name of a rule is used as the user name and the key (as base64 encoded text) is used as the password. The rights associated with the chosen rule govern the operations allowed on the connection.
  2. SASL ANONYMOUS is used for bypassing SASL authorization when the client wants to use the claims-based-security (CBS) model that is described later. With this option, a client connection can be established anonymously for a short time during which the client can only interact with the CBS endpoint and the CBS handshake must complete.

We could choose either SASL PLAIN or CBS for authentication, take PLAIN as an example, I modified your code a little and it works as expected. The magic part is below connection options:

amqpConn, err := container.Connection(
    tlsConn,
    electron.SASLEnable(),
    electron.Password([]byte(eventHubSasKey)),
    electron.User(eventHubSasKeyName),
    electron.VirtualHost(eventHubDomain),
    electron.SASLAllowInsecure(true),
    electron.SASLAllowedMechs("PLAIN"),
)

SASLAllowInsecure returns a ConnectionOption that allows or disallows clear text SASL authentication mechanisms which should be set to true if we choose to use SASL PLAIN.

Hope it helps.