3
votes

I am building a python based interface for pulling data over TCP from an instrument. The datastream comes as specific events, and the timing is not steady: I get bursts of data and then slow periods. They are small data packets, so for simplicity assume they come across as complete packets.

Here is the behavior I get from the socket:

  • Send Event #1: socket.recv returns event #1
  • Send Event #2: socket.recv returns event #2
  • Quickly Send Event #3-50: socket.recv returns only events #3-30 (returns 27 times)
  • Slowly send Event #51: socket returns.recv event #31
  • Slowly send Event #52: socket returns.recv event #32

No data is lost. But there is clearly a buffer somewhere that is filled, and the socket is now returning old data. But shouldn't recv just keep returning till that buffer is empty? Instead, it is only returning when it receives a new packet, despite having a buffer of packets built up. Weird!

Here is the essence of the code (this is for non-blocking, I've also done blocking with just recv - same result). For simplicity I stripped all the packet reassembly stuff. I've carefully traced it back to the socket, so I know that is not to blame.

class mysocket:
    def __init__(self,ip,port):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect((ip,port))
        self.keepConn = True
        self.socket.setblocking(0)
        threading.Thread(target = self.rcvThread).start()
        threading.Thread(target = self.parseThread).start()

    def rcvThread(self):
        while self.keepConn:
            readable,writable,inError = select([self.socket],[self.socket],[],.1)
            if readable:
               packet = self.socket.recv(4096)
               self.recvqueue.put_nowait(packet)
            try:
               xmitmsg = self.sendqueue.get_nowait()
            except Queue.Empty:
               pass
            else:
               if writable:
                   self.socket.send(xmitmsg)

    def parseThread(self,rest = .1):
        while self.keepConn:
            try:
                output = self.recvqueue.get_nowait()
                eventnumber = struct.unpack('<H',output[:2]
                print eventnumber
            except Queue.Empty:
                sleep(rest)

Why can't I get the socket to dump all the data in it's buffer? I can never catch up! This one is too odd. Anybody have pointers?

I'm an amateur but I've really done my homework on this one and am completely baffled.

1
_ I stripped all the packet reassembly stuff_ - your code assumes that each recv() is one event but really you will get a lot of them in one recv() and need to parse them out. By "event #31"... is there something in the event that tells you its 31? If you read multiple events in 1 recv() and discarded them, it would look like this.tdelaney
I left out all the packet reconstruction in the post for brevity. I am doing that... Let's say it this way: the next "event" the recv will return for a couple time - I reassemble all that stuff and it is from packet #32 NOT #55 or whatever was sent. The socket isn't flushing out all the data in its buffer???user2946891

1 Answers

3
votes
packet = self.socket.recv(4096)
self.recvqueue.put_nowait(packet)

TCP is a stream-based protocol, not a message-based one. It doesn't preserve message boundaries. Meaning you can't expect to have one recv() call per message. If you send data in a burst, Nagle's algorithm will combine the data into one TCP packet.

Your code assumes that each recv() call returns one "packet", and the parse thread prints the first number from each "packet". But recv() doesn't return packets, it returns chunks of data from the TCP stream. These chunks can contain one message or multiple messages or even partial messages. There's no guarantee that the first two bytes are always event numbers.

Typically, reading data from a TCP connection involves calling recv() multiple times and storing the data you get in a buffer. Once you've received an entire message then you remove the appropriate number of bytes from the buffer and process them.

If you have variable-length messages then you need to keep track of message boundaries yourself. TCP doesn't do it for you like UDP does. That means adding a header containing the message length to the front of each message.

try:
   xmitmsg = self.sendqueue.get_nowait()
except Queue.Empty:
   pass
else:
   if writable:
       self.socket.send(xmitmsg)

On another note, it looks like this code has a bug. It removes messages from the sendqueue whether or not the socket is writable. If the socket's not writable it'll silently throw away messages.