I would like to use the Electron golang wrapper for the Qpid proton-c library to connect to the Azure EventHub.
I am setting the following SASL details combined to the host/port/namespace/path required to build the connection string but for some reason I keep getting the error message: connection reset by peer
.
package main
import (
"fmt"
"os"
"strings"
"qpid.apache.org/amqp"
"qpid.apache.org/electron"
)
var (
eventHubNamespaceName = "<MY_CUSTOM_NAMESPACE>"
eventHubName = "<MY_CUSTOM_NAME>"
eventHubSasKeyName = "<MY_CUSTOM_SAS_KEY_NAME>"
eventHubSasKey = "<MY_CUSTOM_SAS_KEY>" // this is the base64 encoded stuff
)
func main() {
sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
container := electron.NewContainer(fmt.Sprintf("send[%v]", os.Getpid()))
urlStr := fmt.Sprintf("amqp://%s.servicebus.windows.net:5671/%s", eventHubNamespaceName, eventHubName)
fmt.Printf("The URL connection string: '%v'\n", urlStr)
// parse URL
url, err := amqp.ParseURL(urlStr)
if err != nil {
panic(err)
}
fmt.Printf("The AMQP parsed URL: %v\n", url)
// TCP dial
amqpHost := url.Host
fmt.Printf("The AMQP host used in the connection is: '%v'\n", amqpHost)
c, err := container.Dial(
"tcp", amqpHost,
electron.SASLEnable(),
electron.Password([]byte(eventHubSasKey)),
electron.User(eventHubSasKeyName),
)
if err != nil {
panic(err)
}
defer c.Close(nil)
// AMQP send
addr := strings.TrimPrefix(url.Path, "/")
s, err := c.Sender(electron.Target(addr))
if err != nil {
panic(err)
}
m := amqp.NewMessage()
body := fmt.Sprintf("bla bla bla %v", 42)
m.Marshal(body)
fmt.Printf("The AMQP message body: '%v'\n", m.Body())
go s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
// AMQP ACK receive
fmt.Printf("Waiting for ACKs...\n")
for {
fmt.Printf("Waiting for an ACK coming out of the channel...\n")
out := <-sentChan // Outcome of async sends.
fmt.Printf("Received something: '%v'\n", out)
}
}
When compiling, then running the code, this is the output:
The URL connection string: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP parsed URL: 'amqp://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671/<MY_CUSTOM_NAME>'
The AMQP host used in the connection is: '<MY_CUSTOM_NAMESPACE>.servicebus.windows.net:5671'
The AMQP message body: 'bla bla bla 42'
Waiting for ACKs...
Waiting for an ACK coming out of the channel...
Received something: '{unsent : read tcp <MY_PRIVATE_IP_IN_LAN>:<SOME_PORT>-><THE_NSLOOKUP_IP_OF_THE_AZURE_EVENTHUB>:5671: read: connection reset by peer bla bla bla 42}'
Waiting for an ACK coming out of the channel...
To me that received message saying connection reset by peer
does not look like a valid ACK and I am not sure what is wrong with the connection attempt?
- The compiled version of
proton-c
is 0.18.0, I am usinggo1.7.4 linux/amd64
. - If I add
electron.SASLAllowedMechs("EXTERNAL")
to the connection options then I get the same error message. - If I change the port to
5672
, then I get aconnection refused
panic error after the attempt at dialing via TCP. - If I decode the base64 password field with
base64.StdEncoding.DecodeString(eventHubSasKey)
and pass the bytes to the connection options I keep getting the same errorconnection reset by peer
. - If I add this connection option
electron.SASLAllowedMechs("ANONYMOUS")
, then I still get the same error messageconnection reset by peer
. The reason for doing this is that I am not using any SSL certificate, and the Java wrapper to AMQP that Microsoft provides seems to use this "anonymous" thing instead of the certificate (in fact no certificate is needed to connect to the EventHub using the Java connector).
I am not sure how to proceed here as I am stuck in the connection part and I believe the SASL details are passed in the correct way according to the docs here: https://godoc.org/qpid.apache.org/electron#ConnectionOption
I am still not sure the reason of the failure is not due to SSL certificates, if that's the case I am struggling to see how to include them in the process.
Edit:
I later found out I had to establish a TLS connection over TCP even if I am not providing any private/public pair of keys, also specifying a "virtual host" (otherwise AMQP was complaining about not recognising the host):
// TLS connection details
tlsConfig := &tls.Config{}
eventHubDomainPort := fmt.Sprintf("%s.servicebus.windows.net:5671", eventHubNamespaceName)
tlsConn, err := tls.Dial("tcp", eventHubDomainPort, tlsConfig)
if err != nil {
panic(err)
}
// AMPQ container connection on top of TLS via TCP
eventHubDomain := fmt.Sprintf("%s.servicebus.windows.net", eventHubNamespaceName)
amqpConn, err := container.Connection(
tlsConn,
electron.SASLEnable(),
electron.User(eventHubSasKeyName),
electron.Password([]byte(eventHubSasKey)),
electron.VirtualHost(eventHubDomain),
// electron.SASLAllowedMechs(<SOME_MECHANISM>),
)
if err != nil {
panic(err)
}
defer amqpConn.Close(nil)
// AMQP sender (a AMQP link with target the name defined on the Azure portal)
s, err := amqpConn.Sender(electron.Target(eventHubName))
if err != nil {
panic(err)
}
However when running the app with the environment variable PN_TRACE_FRM=true
(which is giving me some verbose logging at the proton-c
level) now the error is:
[handle=0, closed=true, error=@error(29) [condition=:"amqp:unauthorized-access", description="Unauthorized access. 'Send' claim(s) are required to perform this operation. Resource: 'sb://<MY_CUSTOM_NAMESPACE>.servicebus.windows.net/<MY_CUSTOM_NAME>'. TrackingId:<SOME_UUID-ISH_HERE>, SystemTracker:<A_LABEL_HERE>, Timestamp:10/25/2017 4:02:58 PM"]]
This afaik means the SASL details (username/password) must be of type "sender" because I am trying to send something to the Event Hub. I double checked those details on the Azure portal (click on "Shared access policies" > then using the policy with "claim" specified as "Send") and they are correct. So I am not sure why I am getting this error.
I actually tried these SASL policies defined on the Azure portal at different levels, both <MY_CUSTOM_NAMESPACE>
and <MY_CUSTOM_NAME>
, but always the same error message.
I also tried including various SASL mechanisms e.g. when using electron.SASLAllowedMechs("PLAIN")
then I get this error: no mechanism available: No worthy mechs found (Authentication failed [mech=none])
.