0
votes

I have a static DNS named vops-server.com that points to my Dell PowerEdge 2950. On this 2950 I run the server.py which initially binds to tcp://0.0.0.0:7777, and subsequently redirects any incoming clients to a socket bound to a port greater than 7777, the first of which being 7778. This newly-spawned ROUTER-DEALER pair simply echo "Hello, world!".

The Dell PowerEdge 2950 operates Ubuntu 16.04.1 LTS and fails to run the code properly, outputting the following upon a KeyboardInterrupt:

k▒Eg
Listening on port 7778
Recv
Resource temporarily unavailable
Client on port 7778 disconnected
^CProcess Process-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "test_server.py", line 76, in worker_task
    data = worker_socket.recv_multipart()
  File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 395, in recv_multipart
    parts = [self.recv(flags, copy=copy, track=track)]
  File "zmq/backend/cython/socket.pyx", line 693, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:7283)
  File "zmq/backend/cython/socket.pyx", line 727, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:7081)
  File "zmq/backend/cython/socket.pyx", line 145, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:2033)
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:7522)
    PyErr_CheckSignals()
KeyboardInterrupt

While the client outputs the following:

Connecting to distribution server tcp://vops-server.com:7777
[Dist] Send
[Dist] Recv
[Dist] 7778
Connecting to host tcp://vops-server.com:7778
[Host] Send
[Host] Recv

At this point the client is supposed to output a final line of [Host] Hello, world!, yet it hangs waiting to receive a message.

Now, on my laptop which operates Windows 10 Home 1511, the output is as expected:

 Ç  )
Listening on port 7778
Recv
['\x00\x80\x00\x00)', 'Hello, world!']
Sent

With the client now properly outputting:

Connecting to distribution server tcp://vops-server.com:7777
[Dist] Send
[Dist] Recv
[Dist] 7778
Connecting to host tcp://vops-server.com:7778
[Host] Send
[Host] Recv
[Host] Hello, world!

Please review the code below.

server.py:

import sys
import zmq
from multiprocessing import Process, Queue, Array, Value
import time

def server_task():
    port_base = 7777

    server_context = zmq.Context.instance()

    server_socket = server_context.socket(zmq.ROUTER)

    server_socket.bind("tcp://0.0.0.0:%d" % (port_base, ))

    timeout_queue = Queue()
    port_list = [ 1 ]

    proc_list = [ ]

    while True:
        try:
            client_id = server_socket.recv_multipart()[0]

            print(client_id)

            # Get an unused port from the list
            # Ports from clients that have timed out are recycled here

            while not timeout_queue.empty():
                port_list.append(timeout_queue.get())

            port_offset = port_list.pop()

            if len(port_list) == 0:
                port_list.append(port_offset + 1)

            # Spawn a new worker task, binding the port to a socket

            proc_running = Value("b", True)

            proc_list.append(proc_running)

            Process(target=worker_task, args=(proc_running, port_base, port_offset, timeout_queue)).start()

            # Send the new port to the client

            server_socket.send_multipart([client_id, str(port_base + port_offset)])

        except KeyboardInterrupt:
            break

    for proc_running in proc_list:
        proc_running.value = False

    server_socket.close()
    server_context.term()

def worker_task(proc_running, port_base, port_offset, timeout_queue):
    port = port_base + port_offset

    print("Listening on port %d" % (port, ))

    worker_context = zmq.Context.instance()

    worker_socket = worker_context.socket(zmq.ROUTER)

    worker_socket.setsockopt(zmq.RCVTIMEO, 5000)
    worker_socket.bind("tcp://0.0.0.0:%d" % (port, ))

    while proc_running.value:
        try:
            print("Recv")
            data = worker_socket.recv_multipart()

            print(data)
            worker_socket.send_multipart(data)

            print("Sent")
        except zmq.ZMQError as e:
            print(e)
            break

    print("Client on port %d disconnected" % (port, ))

    timeout_queue.put(port_offset)

    worker_socket.close()
    worker_context.term()

if __name__ == "__main__":
    server_task()

client.py:

import os
import io
import time
import zmq
from multiprocessing import Process

def connect_to_host(context, host, port, timeout):
    address = ("tcp://%s:%s" % (host, port))

    socket = context.socket(zmq.DEALER)

    socket.setsockopt(zmq.RCVTIMEO, timeout)
    socket.connect(address)

    print("Connecting to distribution server %s" % (address, ))

    while True:
        try:
            print("[Dist] Send")
            socket.send_multipart([str(0)])
            print("[Dist] Recv")
            port = socket.recv_multipart()[0]
            print("[Dist] %s" % (port, ))

            break
        except zmq.Again:
            socket.close()

            socket = context.socket(zmq.DEALER)

            socket.setsockopt(zmq.RCVTIMEO, timeout)
            socket.connect(address)

            print("Connecting to distribution server %s" % (address, ))

    socket.close()

    address = ("tcp://%s:%s" % (host, port))

    socket = context.socket(zmq.DEALER)

    socket.setsockopt(zmq.RCVTIMEO, timeout)
    socket.connect(address)

    print("Connecting to host %s" % (address, ))

    return socket

def client_task(client_type):
    timeout = 5000
    host = "vops-server.com"
    port = "7777"

    context = zmq.Context().instance()

    socket = connect_to_host(context, host, port, timeout)

    while True:
        try:
            try:
                print("[Host] Send")
                socket.send_multipart(["Hello, world!"])

                print("[Host] Recv")
                data = socket.recv_multipart()[0]

                print("[Host] %s" % (data, ))
            except zmq.Again:
                socket.close()

                socket = connect_to_host(context, host, port, timeout)

        except KeyboardInterrupt:
            break

    print("Connection terminated")

    socket.close()
    context.term()

if __name__ == "__main__":
    client_task(0)

I can't possibly fathom why the same code works on the one machine and not the other; I've considered installing Windows Server 2012 on the Dell PowerEdge 2950 with my hopes being that the error lay in the OS and not the hardware itself. For now I wait and hope that an expert somewhere has the solution to my problem.

1

1 Answers

0
votes

Same code works fine on the Dell PowerEdge 2950 running Windows Server 2012 R2. ZMQ appears to have issues with Ubuntu.