I'm having a lot of packet loss using UDP in python. I know I should use TCP if I don't want packet loss, but I don't have (full) controll over the sender.
It's a camera that sends 15 images per second using UDP multicast.
Below you see the code I've written now. It uses multiprocessing to allow the producer and consumer function to work in parallel. Producer function catches the packets, consumer function processes them and writes the images to .bmp files.
I've written a class PacketStream which writes the bytes from the packages to a .bmp file.
When the camera sends a new image, it first sends one packet, with first byte = 0x01. This contains information about the image. Then 612 packets are sent with first byte = 0x02. These contain the bytes from the image (508 bytes/packet).
Since 15 images are sent per second, ~9000 packets are sent per second. Altough this happens at a faster rate in bursts per image, at ~22 packets/ms.
I can receive all packets perfectly using tcpdump or wireshark. But using the code below, packets are missed. Surely my windows 7 pc should be able to handle this? I'm also using it on a raspberry pi 3, and there more or less the same number of packets is missed. Therefore I think it's a problem with the code.
I've tried lots of different things, like threading instead of multiprocessing, Pipe instead of Queue.
I also tried increasing the socket buffer with
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000)
to no avail.
Is this at all possible in python?
Thanks in advance,
import time
from multiprocessing import Process, Queue
import socket
import struct
from PIL import Image
class PacketStream:
def __init__(self, output_path):
self.output_path = output_path
self.data_buffer = ''
self.img_id = -1 # -1 = waiting for start of new image
def process(self, data):
message_id = data[0]
if message_id == '\x01':
self.wrap_up_last_image()
self.img_id = ord(data[3])
self.data_buffer = ''
if message_id == '\x02':
self.data_buffer += data[6:]
def wrap_up_last_image(self):
if self.img_id > 0:
n_bytes = len(self.data_buffer)
if n_bytes == 307200:
global i
write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp',
self.data_buffer)
i += 1
else:
print 'Image lost: %s bytes missing.' % (307200 - n_bytes)
def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def producer(q):
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', MCAST_PORT))
mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
while True:
q.put(sock.recv(512))
def consumer(q):
packet_stream = PacketStream('D:/bmpdump/')
while True:
data = q.get()
packet_stream.process(data)
i = 0
if __name__ == '__main__':
q = Queue()
t1 = Process(target=producer, args=(q,))
t1.daemon = True # so they stop when the main prog stops
t1.start()
t2 = Process(target=consumer, args=(q,))
t2.daemon = True
t2.start()
time.sleep(10.0)
print 'Program finished.'
EDIT
Thanks for all the suggestions.
1) I tried threading+queue already, also the ''.join(), didn't seem to make much difference. I'm quite sure now the problem is that the producer thread doesn't get enough priority. I can't find how to increase this using Python? Is this even possible?
2) I managed to lose only about 10% using the code below. Processor is at ~25% (on the raspberry pi) The key is to consume the data when there's a pause in the packet stream, i.e. when the last data package has arrived
import time
import socket
import struct
from PIL import Image
def write_image(path, data):
im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1)
im.save(path)
print time.time(), path
def consume(data_buffer):
img_id = ord(data_buffer[0][1])
real_data_buffer = [data[6:] for data in data_buffer]
data_string = ''.join(real_data_buffer)
global i
write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string)
i += 1
def producer(sock):
print 'Producer start'
data_buffer = []
while True:
data = sock.recvfrom(512)[0]
if data[0] == '\x01':
data_buffer = []
else:
data_buffer.append(data)
if len(data_buffer) == 612:
consume(data_buffer)
# image counter
i = 0
# setup socket
MCAST_GRP = '239.255.83.71'
MCAST_PORT = 2271
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((MCAST_GRP, MCAST_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000)
producer(sock)
sock.recvfrom
rather thansock.recv
when dealing with UDP sockets. Maybe that helps? – Irmen de Jong