13
votes

I am experiencing a long (3-hour) delay (EDIT: the delay is brief at first and then gets longer throughout the day) in processing data pushed from a websocket server to my client.py. I know that it is not delayed by the server.

For example every 5 seconds I see the keep_alive log-event and its respective timestamp. So that runs smoothly. But when I see a data frame processed in logs is actually 3 hours after when the server sent it. Am I doing something to delay this process?

Am I calling my coroutine 'keep_alive' correctly? keep_alive is just a message to the server to keep the connection alive. The server echos the message back. Also am I logging too much? Could that be delaying the processing (I don't think so since I'm seeing the logging events occur right away).

async def keep_alive(websocket):
                """
                 This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
                """
                await websocket.send('Hello')   
                await asyncio.sleep(5)

async def open_connection_test():
    """
    Establishes web socket (WSS). Receives data and then stores in csv.
    """
    async with websockets.connect( 
            'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
        while True:    
            """
            Handle message from server.
            """
            message = await websocket.recv()
            if message.isdigit():
                # now = datetime.datetime.now()
                rotating_logger.info ('Keep alive message: {}'.format(str(message)))
            else:
                jasonified_message = json.loads(message)
                for key in jasonified_message:
                    rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))    
                """
                Store in a csv file.
                """
                try:            
                    convert_and_store(jasonified_message)
                except PermissionError:
                    convert_and_store(jasonified_message, divert = True)                        
            """
            Keep connection alive.
            """            
            await keep_alive(websocket)

"""
Logs any exceptions in logs file.
"""
try:
    asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
    rotating_logger.info (e)

EDIT: From the documentation - I'm thinking that this might have something to do with it - but I haven't connected the dots.

The max_queue parameter sets the maximum length of the queue that holds incoming messages. The default value is 32. 0 disables the limit. Messages are added to an in-memory queue when they’re received; then recv() pops from that queue. In order to prevent excessive memory consumption when messages are received faster than they can be processed, the queue must be bounded. If the queue fills up, the protocol stops processing incoming data until recv() is called. In this situation, various receive buffers (at least in asyncio and in the OS) will fill up, then the TCP receive window will shrink, slowing down transmission to avoid packet loss.

EDIT 9/28/2018: I'm testing it without the keep-alive message and that doesn't seem to be the issue. Could it be related to the convert_and_store() function? Does this need to be async def and then awaited as well?

def convert_and_store(data, divert = False, test = False):
    if test:
        data = b
    fields = data.keys()
    file_name =  parse_call_type(data, divert = divert)
    json_to_csv(data, file_name, fields)

EDIT 10/1/2018: It seems that both the keep-alive message and convert_and_store are both at issue; if I extend the keep-alive message to 60 seconds - then convert_and_store will run only once per 60 seconds. So convert_and_store is waiting on the keep_alive()...

3
Have you try simply send & recv a message, just like "Hello" and "Hello back"? Is the delay still occur?MT-FreeHK
Good point. That's actually what I'm doing with my 'keep alive' message. So that does not have a delay. But processing the actual (non-keep alive) messages and then appending them to the csv file is what gets more and more delayed throughout the day.Liam Hanninen
In this case, may be you can try using multiprocessing. Move the part for handling csv to a function and use p = Process(target=my_function, args=argss) to divert the task from the async-await, see if it can avoid the problem or not.MT-FreeHK
That sounds like a solid back up thanks. I'll look into the docs and test it. Without having any experience with multiprocessing I'd prefer to stick with the asyncio library.Liam Hanninen

3 Answers

9
votes

Could it be related to the convert_and_store() function?

Yes, it could be. Blocking code should not be called directly. If a function performs a CPU-intensive calculation for 1 second, all asyncio Tasks and IO operations would be delayed by 1 second.

An executor can be used to run a blocking code in a different thread/process:

import asyncio
import concurrent.futures
import time

def long_runned_job(x):
    time.sleep(2)
    print("Done ", x)

async def test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        for i in range(5):
            loop.run_in_executor(pool, long_runned_job, i)
            print(i, " is runned")
            await asyncio.sleep(0.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(test())

In your case it should look something like this:

import concurrent.futures

async def open_connection_test():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        async with websockets.connect(...) as websocket:
            while True:    
                ...
                loop.run_in_executor(pool, convert_and_store, args)

EDITED:

It seems that both the keep-alive message and convert_and_store are both at issue

You can run keep_alive in background:

async def keep_alive(ws):
    while ws.open:
        await ws.ping(...)   
        await asyncio.sleep(...)

async with websockets.connect(...) as websocket:
    asyncio.ensure_future(keep_alive(websocket))
    while True:    
        ...
1
votes

You must start a new thread for this keep_alive() function indeed.

For async-await, it is promising all tasks have been done before proceeding to next step.

Thus, await keep_alive(websocket) actually block the thread in this sense. You may not await the keep_alive here such that the process can continue but for sure, that is not what you want, I sure.

Actually, what you want is two time-frame, one for communicate with server, one for keep server alive. They should be separated as they are in different coroutine.

So, the correct way is using Thread, and don't need to use asyncio in this case, keep things simple.

First, change keep_alive() into following.

def keep_alive():
    """
        This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
    """
    while True:
        websocket.send('Hello') 
        time.sleep(1)

In open_connection_test()

async def open_connection_test():
    """
    Establishes web socket (WSS). Receives data and then stores in csv.
    """
    thread = threading.Thread(target=keep_alive, args=())
    thread.daemon = True   # Daemonize
    thread.start()
    async with websockets.connect(...) as websocket:
        ....
        #No need this line anymore.
        #await keep_alive(websocket) 
0
votes

I think this would be more clear,Use ThreadPoolExecutor make blocking code running in background

from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=4)

def convert_and_store(data, divert=False, test=False):
    loop = asyncio.get_event_loop()
    loop.run_in_executor(pool, _convert_and_store, divert, test)


def _convert_and_store(data, divert=False, test=False):
    if test:
        data = b
    fields = data.keys()
    file_name = parse_call_type(data, divert=divert)
    json_to_csv(data, file_name, fields)

asyncio send keep alive msg demo

async def kepp_alive(websocket):
    while True:
        await websocket.send_str(ping)
        await asyncio.sleep(10)