This is a working code for the above requirement.
Code:
import asyncio
import websockets
import json
import threading
from six.moves import queue
from google.cloud import speech
from google.cloud.speech import types
IP = '0.0.0.0'
PORT = 8000
class Transcoder(object):
"""
Converts audio chunks to text
"""
def __init__(self, encoding, rate, language):
self.buff = queue.Queue()
self.encoding = encoding
self.language = language
self.rate = rate
self.closed = True
self.transcript = None
def start(self):
"""Start up streaming speech call"""
threading.Thread(target=self.process).start()
def response_loop(self, responses):
"""
Pick up the final result of Speech to text conversion
"""
for response in responses:
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
if result.is_final:
self.transcript = transcript
def process(self):
"""
Audio stream recognition and result parsing
"""
#You can add speech contexts for better recognition
cap_speech_context = types.SpeechContext(phrases=["Add your phrases here"])
client = speech.SpeechClient()
config = types.RecognitionConfig(
encoding=self.encoding,
sample_rate_hertz=self.rate,
language_code=self.language,
speech_contexts=[cap_speech_context,],
model='command_and_search'
)
streaming_config = types.StreamingRecognitionConfig(
config=config,
interim_results=False,
single_utterance=False)
audio_generator = self.stream_generator()
requests = (types.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator)
responses = client.streaming_recognize(streaming_config, requests)
try:
self.response_loop(responses)
except:
self.start()
def stream_generator(self):
while not self.closed:
chunk = self.buff.get()
if chunk is None:
return
data = [chunk]
while True:
try:
chunk = self.buff.get(block=False)
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b''.join(data)
def write(self, data):
"""
Writes data to the buffer
"""
self.buff.put(data)
async def audio_processor(websocket, path):
"""
Collects audio from the stream, writes it to buffer and return the output of Google speech to text
"""
config = await websocket.recv()
if not isinstance(config, str):
print("ERROR, no config")
return
config = json.loads(config)
transcoder = Transcoder(
encoding=config["format"],
rate=config["rate"],
language=config["language"]
)
transcoder.start()
while True:
try:
data = await websocket.recv()
except websockets.ConnectionClosed:
print("Connection closed")
break
transcoder.write(data)
transcoder.closed = False
if transcoder.transcript:
print(transcoder.transcript)
await websocket.send(transcoder.transcript)
transcoder.transcript = None
start_server = websockets.serve(audio_processor, IP, PORT)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()