2
votes

I've read the Google documentation and looked at their examples however have not managed to get this working correctly in my particular use case. The problem is that the packets of the audio stream are broken up into smaller chunks (frame size) base64 encoded and sent over MQTT - meaning that the generator approach is likely to stop part way through despite not being fully completed by the sender. My MicrophoneSender component will send the final part of the message with a segment_key = -1, so this is the flag that the complete message has been sent and that a full/final process of the stream can be completed. Prior to that point the buffer may not have all of the complete stream so it's difficult to get either a) the generator to stop yielding b) the google as to return a partial transcription. A partial transcription is required once every 10 or so frames.

To illustrate this better here is my code.

inside receiver:

    STREAMFRAMETHRESHOLD = 10
    def mqttMsgCallback(self, client, userData, msg):
         if msg.topic.startswith("MicSender/stream"):
                msgDict = json.loads(msg.payload)
                streamBytes = b64decode(msgDict['audio_data'].encode('utf-8'))
                frameNum = int(msgDict['segment_num'])

                if frameNum == 0:
                    self.asr_time_start = time.time()
                    self.asr.endOfStream = False

                if frameNum >= 0:
                    self.asr.store_stream_bytes(streamBytes)
                    self.asr.endOfStream = False

                    if frameNum % STREAMFRAMETHRESHOLD == 0:
                        self.asr.get_intermediate_and_print()

                else:
                    #FINAL, recieved -1
                    trans = self.asr.finish_stream()
                    self.send_message(trans)
                    self.frameCount=0

inside Google Speech Class implementation:

class GoogleASR(ASR):

    def __init__(self, name):
        super().__init__(name)    

        # STREAMING
        self.stream_buf = queue.Queue()
        self.stream_gen = self.getGenerator(self.stream_buf)
        self.endOfStream = True
        self.requests = (types.StreamingRecognizeRequest(audio_content=chunk) for chunk in self.stream_gen)
        self.streaming_config = types.StreamingRecognitionConfig(config=self.config)
        self.current_transcript = ''
        self.numCharsPrinted = 0

    def getGenerator(self, buff):
        while not self.endOfStream:
            # Use a blocking get() to ensure there's at least one chunk of
            # data, and stop iteration if the chunk is None, indicating the
            # end of the audio stream.
            chunk = buff.get()
            if chunk is None:
                return
            data = [chunk]

            # Now consume whatever other data's still buffered.
            while True:
                try:
                    chunk = buff.get(block=False)
                    data.append(chunk)

                except queue.Empty:
                    self.endOfStream = True
                    yield b''.join(data)
                    break

            yield b''.join(data)


    def store_stream_bytes(self, bytes):
        self.stream_buf.put(bytes)

    def get_intermediate_and_print(self):
        self.get_intermediate()

    def get_intermediate(self):

        if self.stream_buf.qsize() > 1:
            print("stream buf size: {}".format(self.stream_buf.qsize()))
            responses = self.client.streaming_recognize(self.streaming_config, self.requests)
            # print(responses)

            try:
                # Now, put the transcription responses to use.
                if not self.numCharsPrinted:
                    self.numCharsPrinted = 0

                for response in responses:

                    if not response.results:
                        continue

                    # The `results` list is consecutive. For streaming, we only care about
                    # the first result being considered, since once it's `is_final`, it
                    # moves on to considering the next utterance.
                    result = response.results[0]
                    if not result.alternatives:
                        continue

                    # Display the transcription of the top alternative.
                    self.current_transcript = result.alternatives[0].transcript

                    # Display interim results, but with a carriage return at the end of the
                    # line, so subsequent lines will overwrite them.
                    #
                    # If the previous result was longer than this one, we need to print
                    # some extra spaces to overwrite the previous result
                    overwrite_chars = ' ' * (self.numCharsPrinted - len(self.current_transcript))
                    sys.stdout.write(self.current_transcript + overwrite_chars + '\r')
                    sys.stdout.flush()
                    self.numCharsPrinted = len(self.current_transcript)

    def finish_stream(self):
        self.endOfStream = False
        self.get_intermediate()
        self.endOfStream = True

        final_result = self.current_transcript

        self.stream_buf= queue.Queue()
        self.allBytes = bytearray()
        self.current_transcript = ''
        self.requests = (types.StreamingRecognizeRequest(audio_content=chunk) for chunk in self.stream_gen)
        self.streaming_config = types.StreamingRecognitionConfig(config=self.config)

        return final_result

Currently what this does is output nothing from the transcriptions side.

stream buf size: 21
stream buf size: 41
stream buf size: 61
stream buf size: 81
stream buf size: 101
stream buf size: 121
stream buf size: 141
stream buf size: 159

But the response/transcript is empty. If I put a breakpoint on the for response in responses inside the get_intermediate function then it never runs which means that for some reason it's empty (not retuned from Google). However, if I put a breakpoint on the generator and take too long (> 5 seconds) to continue to yield the data, it (Google) tells me that the data is probably being sent to the server too slow. google.api_core.exceptions.OutOfRange: 400 Audio data is being streamed too slow. Please stream audio data approximately at real time.

Maybe someone can spot the obvious here...

1

1 Answers

2
votes

The way you have organized your code, the generator you give to the Google API is initialized exactly once - on line 10, using a generator expression: self.requests = (...). As constructed, this generator will also run exactly once and become 'exhausted'. Same applies to the generator function that the (for ...) generator itself calls (self.getGeneerator()). It will run once only and stop when it retrieved 10 chunks of data (which are very small, from what I can see). Then, the outer generator (what you assigned to self.requests) will also stop forever - giving the ASR only a short bit of data (10 times 20 bytes, looking at the printed debug output). There's nothing recognizable in that, most likely.

BTW, note you have a redundant yield b''.join(data) in your function, the data will be sent twice.

You will need to redo the (outer) generator so it does not return until all data is received. If you want to use another generator as you do to gather each bigger chunk for the 'outer' generator from which the Google API is reading, you will need to re-make it every time you begin a new loop with it.