I have a static DNS named vops-server.com
that points to my Dell PowerEdge 2950. On this 2950 I run the server.py
which initially binds to tcp://0.0.0.0:7777
, and subsequently redirects any incoming clients to a socket bound to a port greater than 7777
, the first of which being 7778
. This newly-spawned ROUTER-DEALER
pair simply echo "Hello, world!"
.
The Dell PowerEdge 2950 operates Ubuntu 16.04.1 LTS
and fails to run the code properly, outputting the following upon a KeyboardInterrupt:
k▒Eg
Listening on port 7778
Recv
Resource temporarily unavailable
Client on port 7778 disconnected
^CProcess Process-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "test_server.py", line 76, in worker_task
data = worker_socket.recv_multipart()
File "/usr/local/lib/python2.7/dist-packages/zmq/sugar/socket.py", line 395, in recv_multipart
parts = [self.recv(flags, copy=copy, track=track)]
File "zmq/backend/cython/socket.pyx", line 693, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:7283)
File "zmq/backend/cython/socket.pyx", line 727, in zmq.backend.cython.socket.Socket.recv (zmq/backend/cython/socket.c:7081)
File "zmq/backend/cython/socket.pyx", line 145, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:2033)
File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:7522)
PyErr_CheckSignals()
KeyboardInterrupt
While the client outputs the following:
Connecting to distribution server tcp://vops-server.com:7777
[Dist] Send
[Dist] Recv
[Dist] 7778
Connecting to host tcp://vops-server.com:7778
[Host] Send
[Host] Recv
At this point the client is supposed to output a final line of [Host] Hello, world!
, yet it hangs waiting to receive a message.
Now, on my laptop which operates Windows 10 Home 1511
, the output is as expected:
Ç )
Listening on port 7778
Recv
['\x00\x80\x00\x00)', 'Hello, world!']
Sent
With the client now properly outputting:
Connecting to distribution server tcp://vops-server.com:7777
[Dist] Send
[Dist] Recv
[Dist] 7778
Connecting to host tcp://vops-server.com:7778
[Host] Send
[Host] Recv
[Host] Hello, world!
Please review the code below.
server.py:
import sys
import zmq
from multiprocessing import Process, Queue, Array, Value
import time
def server_task():
port_base = 7777
server_context = zmq.Context.instance()
server_socket = server_context.socket(zmq.ROUTER)
server_socket.bind("tcp://0.0.0.0:%d" % (port_base, ))
timeout_queue = Queue()
port_list = [ 1 ]
proc_list = [ ]
while True:
try:
client_id = server_socket.recv_multipart()[0]
print(client_id)
# Get an unused port from the list
# Ports from clients that have timed out are recycled here
while not timeout_queue.empty():
port_list.append(timeout_queue.get())
port_offset = port_list.pop()
if len(port_list) == 0:
port_list.append(port_offset + 1)
# Spawn a new worker task, binding the port to a socket
proc_running = Value("b", True)
proc_list.append(proc_running)
Process(target=worker_task, args=(proc_running, port_base, port_offset, timeout_queue)).start()
# Send the new port to the client
server_socket.send_multipart([client_id, str(port_base + port_offset)])
except KeyboardInterrupt:
break
for proc_running in proc_list:
proc_running.value = False
server_socket.close()
server_context.term()
def worker_task(proc_running, port_base, port_offset, timeout_queue):
port = port_base + port_offset
print("Listening on port %d" % (port, ))
worker_context = zmq.Context.instance()
worker_socket = worker_context.socket(zmq.ROUTER)
worker_socket.setsockopt(zmq.RCVTIMEO, 5000)
worker_socket.bind("tcp://0.0.0.0:%d" % (port, ))
while proc_running.value:
try:
print("Recv")
data = worker_socket.recv_multipart()
print(data)
worker_socket.send_multipart(data)
print("Sent")
except zmq.ZMQError as e:
print(e)
break
print("Client on port %d disconnected" % (port, ))
timeout_queue.put(port_offset)
worker_socket.close()
worker_context.term()
if __name__ == "__main__":
server_task()
client.py:
import os
import io
import time
import zmq
from multiprocessing import Process
def connect_to_host(context, host, port, timeout):
address = ("tcp://%s:%s" % (host, port))
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.RCVTIMEO, timeout)
socket.connect(address)
print("Connecting to distribution server %s" % (address, ))
while True:
try:
print("[Dist] Send")
socket.send_multipart([str(0)])
print("[Dist] Recv")
port = socket.recv_multipart()[0]
print("[Dist] %s" % (port, ))
break
except zmq.Again:
socket.close()
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.RCVTIMEO, timeout)
socket.connect(address)
print("Connecting to distribution server %s" % (address, ))
socket.close()
address = ("tcp://%s:%s" % (host, port))
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.RCVTIMEO, timeout)
socket.connect(address)
print("Connecting to host %s" % (address, ))
return socket
def client_task(client_type):
timeout = 5000
host = "vops-server.com"
port = "7777"
context = zmq.Context().instance()
socket = connect_to_host(context, host, port, timeout)
while True:
try:
try:
print("[Host] Send")
socket.send_multipart(["Hello, world!"])
print("[Host] Recv")
data = socket.recv_multipart()[0]
print("[Host] %s" % (data, ))
except zmq.Again:
socket.close()
socket = connect_to_host(context, host, port, timeout)
except KeyboardInterrupt:
break
print("Connection terminated")
socket.close()
context.term()
if __name__ == "__main__":
client_task(0)
I can't possibly fathom why the same code works on the one machine and not the other; I've considered installing Windows Server 2012 on the Dell PowerEdge 2950 with my hopes being that the error lay in the OS and not the hardware itself. For now I wait and hope that an expert somewhere has the solution to my problem.