3
votes

I've a Python script which reads stuff from a stream, and when a new string gets readed, it pushes its content (a string) to a RabbitMQ queue.

The thing is that the stream might not send messages in 1, 2 or 9h or so, so I would like to have the RabbitMQ connection always open.

The problem is that when I create the conection and the channel:

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials))
channel = self.connection.channel()
channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')

... and if after an hour a message arrives, I get this error:

  File "/usr/local/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/var/opt/rabbitmq-agent.py", line 34, in push_to_queue
    raise Exception("Error sending the message to the queue: " + format(e))
Exception: Error sending the message to the queue: Send message to publisher error: Channel allocation requires an open connection: <SelectConnection CLOSED socket=None params=<ConnectionParameters host=x port=xvirtual_host=/ ssl=False>>

Which I suppose is that the connection has been closed between the rabbitmq server and client.

How can I avoid this? I would like to have a "please, keep the connection alive always". Maybe setting a super-big heartbeat in the connection parameters of Pika? Something like this:

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials, heartbeat=6000))

Any other cooler solutions would be highly appreciated.

Thanks in advance

2

2 Answers

5
votes

I would suggest you check connection every time before sending message and if the connection is closed then simply reconnect.

if not self.connection or self.connection.is_closed:
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials))
    channel = self.connection.channel()
    channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')
-2
votes

You could try adding heartbeat to your ConnectionParameters. This will create light traffic by sending heartbeats every specified seconds. This will exercise the connections. Some firewalls or proxies tend to scrape idle connections. Even RabbitMQ has a timeout on connections that are idle.

import pika

# Set the connection parameters to connect to rabbit-server1 on port 5672
# on the / virtual host using the username "guest" and password "guest"
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('rabbit-server1',
                                       5672,
                                       '/',
                                       heartbeat=60,
                                       credentials)

See here for pika documentation.

Additionally you should have code in place that mitigates network disconnection. This can always happen and will. So appart from the heartbeat have some exception handling ready to re-open closed connections in a graceful way.