0
votes

In my live phone speech recognition project Python's asyncio and websockets modules are used basically to enable data exchange between client and server in asynchronous mode. The audio stream which to be recognized comes to the client from inside of a PBX channel (Asterisk PBX works for that) via a local wav file that cumulates all data from answering call until hangup event. While conversation is going on, an async producer pushes chunks of call record (each of them no larger than 16 kB) to asyncio queue, so that a consumer coroutine can write data to buffer before sending to the recognition engine server (my pick is Vosk instance with Kaldi engine designed to connect using websocket interface). Once the buffer exceeds a specific capacity (for example it may be 288 kB), the data should be flushed to recognition by send function and returned (as a transcript of the speech) by recv. The real-time recognition does matter here, therefore I need to guarantee that socket operations like recv will not halt both coroutines throughout websocket session (they should be able to keep queue-based data flow until the hangup event). Let's take a look at whole program, first of all there is a main where an event loop gets instantiated as well as a couple of tasks:

import logging
import asyncio
import time
from concurrent.futures._base import CancelledError  

from .transcription import Transcriber, get_record_size_info

logging.basicConfig(level=logging.DEBUG)
record_file_name = '/var/spool/asterisk/monitor/callrecord.wav'    

def main():
    transcriber = Transcriber()       

    logging.getLogger('asyncio').setLevel(logging.ERROR)
    logging.getLogger('asyncio.coroutines').setLevel(logging.ERROR)
    logging.getLogger('websockets.server').setLevel(logging.ERROR)
    logging.getLogger('websockets.protocol').setLevel(logging.ERROR)

    loop = asyncio.get_event_loop()    
    time.sleep(2)

    prod_task = loop.create_task(transcriber.run_producer(transcriber._queue))
    consum_task = loop.create_task(transcriber.run_consumer(transcriber._queue))
            
    tasks = [prod_task, consum_task]
            
    executed, remaining = loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)) 
    logging.debug('Tasks completed: %s', executed)
    logging.debug('Tasks in progress: %s', remaining)

    for task in remaining:
        logging.info('Dropping task %s: %s', task, task.cancel())
    try:
        loop.run_until_complete(asyncio.gather(*remaining))
    except CancelledError:
        for running_task in remaining:
        logging.debug('Task dropped %s: %s', running_task, running_task.cancelled())                                         
            
    loop.stop()    
    loop.close()         

if __name__ == '__main__':
    main()       

Producer/consumer implementations given below:

from queue import Queue
from concurrent.futures._base import CancelledError 
from pathlib import Path

import logging
import asyncio
import websockets
import json

ASR_WS_ADDRESS = 'ws://127.0.0.1:2700'

class Transcriber:

    def __init__(self):
        self._queue = asyncio.Queue()
        self._buffer = b''
        self._current_record_size = 0 # terminate reading from wav file if current size of record is equal to total payload 
        self._record_file_name = '/var/spool/asterisk/monitor/callrecord.wav'
        self._total_payload = 0 # total of bytes written to buffer since call recording started  
           
    async def run_producer(self, qu):
        with open(self._record_file_name, 'rb') as record:
            print('call record file size: ' + str(get_record_size_info(self._record_file_name)))
            self._current_record_size = get_record_size_info(self._record_file_name)
            while True:
                await asyncio.sleep(0.5)
                chunk = record.read(16000)           
                qu.put_nowait(chunk)            
                qsize = qu.qsize()

    async def run_consumer(self, qu):
        while True:            
            data = await qu.get()             
            await asyncio.sleep(1)
            self._buffer += data                      
            self._current_record_size = get_record_size_info(self._record_file_name)            
            print('now buffer contains : ' + str(len(self._buffer)) + ' bytes')
            print('current record size: ' + str(self._current_record_size) + ' bytes')
            print('current total payload: ' + str(self._total_payload) + ' bytes')           
           
            if len(self._buffer) >= 288000:                
                await self.do_recognition()
                self._total_payload += len(data)
                self._buffer = b''               
            elif len(data) == 0 and self._current_record_size == self._total_payload:
                print('looks like recording is complete...')
                await self.do_recognition()               
                self._queue._queue.clear() # remove items from queue before loop gets close      
                self._queue._finished.set()
                self._queue._unfinished_tasks = 0               
                raise Exception('cancel both tasks and close loop')
            else:
                self._total_payload += len(data)
                continue
    
    async def do_recognition(self):
        async with websockets.connect(ASR_WS_ADDRESS) as ws:        
            logging.debug('Sending %s to Vosk-hosted Kaldi engine', len(self._buffer))
            await ws.send(self._buffer)
                                     
            response = json.loads(await ws.recv())
            try:
                result = response['partial']
                if len(result) > 0:
                print('\n')
                print(result + '\n')
            except KeyError:
                result = response['text']
                if len(result) > 0:
                print('\n')
                print(result + '\n')    

def get_record_size_info(record_file_name):
    return Path(record_file_name).stat().st_size

Here is a problem I'm wrapping my head around for a few days: how to run do_recognition method in non-blocking manner to avoid 2-3 second stalling once recv execution starts? Than longer call conversation, than more requests for speech recognition I need to fire, i.e. essentially blocking program becomes disastrous for real time performance. Because of recurrent stop/resume execution in my case, each of solutions I've seen on SO yet (specifically, 1, 2, 3) doesn't solve this issue, so I'm seeking any pointers to deal with that respectively. Please share some ideas what workaround can be applied to enable improvements I want, my own experience with asyncio is far from be sufficient for tuning above stuff effectively.

1
Note probably unrelated to the question: your code is accessing internal implementation attributes of queue, which can stop working at any point, even in a bugfix release of Python. Note: you can import CancelledError from asyncio which exposes it publicly. Also, you don't need to refer to the internal concurrent.futures._base, which just happens to be where the class is defined by the implementation. - user4815162342
As for your main question, I don't fully understand the issue, but perhaps you want to replace await self.do_recognition() with asyncio.create_task(self.do_recognition()) to make do_recognition execute in the background. In that case you probably want to extract the value of self._buffer and pass it to do_recognition as parameter, so that it can transfer the buffer contents independently of the new stuff that arrives. - user4815162342
You've managed this issue perfectly, @user4815162342, there is a correct understanding of what I needed. In my case though I used asyncio.ensure_future(self._do_recognition(audio_input)) instead of asyncio.create_task due to Python version below 3.7 (according with asyncio references here). Absolutely, the latest comment is worth to become an answer I gonna approve while next revisiting SO. Thank you a lot for this marvellous help. - Haskell bit

1 Answers

0
votes

If I understand the issue correctly, you probably want to replace await self.do_recognition() with asyncio.create_task(self.do_recognition()) to make do_recognition execute in the background. If you need to support Python 3.6 and earlier, you can use loop.create_task(...) or asyncio.ensure_future(...), all of which in this case do the same thing.

When doing that you'll also need to extract the value of self._buffer and pass it to do_recognition as parameter, so that it can send the buffer contents independently of the new data that arrives.

Two notes unrelated to the question:

  • The code is accessing internal implementation attributes of queue, which should be avoided in production code because it can stop working at any point, even in a bugfix release of Python. Attributes that begin with _ like _finished and _unfinished_tasks are not covered by backward compatibility guarantees and can be removed, renamed, or change meaning without notice.

  • You can import CancelledError from the top-level asyncio package which exposes it publicly. You don't need to refer to the internal concurrent.futures._base module, which just happens to be where the class is defined by the implementation.