1
votes

Summary

I need to stream from Text to Speech (tts) to Azure Storage Blobs from an Azure Function (serverless). The code works but doesn't complete in the order I expect.

Functional description

The code converts text to an audio (.mp3) file via streaming. The text-to-speech (tts) transformation is via a REST API inside the TextToSpeech class that works with streams. The tts call writes the audio stream to a writable stream - which is the Azure storage blob.

Expect

I think the call to the function that creates the blob should complete after the blob is finished writing but that isn't the case. Is this code correct and if it isn't, where is the problem?

I expect the output to be:

  • The end! - from tts
  • Blob result - from fn
  • then - from code to call fn

Receive

The received output (shown at bottom) is:

  • then
  • Blob result
  • The end!

Code for serverless function:

require('dotenv').config();
const TextToSpeech = require("./textToSpeech");
const azure = require('azure-storage');

const fn = async () => {

    const blobService = azure.createBlobService(process.env.STORAGECONNECTIONSTRING);

    const textToSpeech = new TextToSpeech({
        accessTokenHost: process.env.SPEECHACCESSTOKENHOST,
        ttsHost: process.env.SPEECHRESOURCETTSHOST,
        ttsKey: process.env.SPEECHRESOURCETTSKEY
    });

    const userName = "diberry";
    const container = "function-blob-tts";
    const directory = userName;
    const transformConfig = {"filenameandpath": 'test.mp3'};

    const blobName = directory + "/" + transformConfig.filenameandpath;

    // DOCS: https://azure.github.io/azure-storage-node/BlobService.html#createWriteStreamToBlockBlob__anchor
    const writableStream = blobService.createWriteStreamToBlockBlob(container, blobName, { blockIdPrefix: 'block' });

    await textToSpeech.transform(transformConfig, "This is a brand new world.", writableStream);

    // blob properties
    return await blobService.getBlobProperties(container, blobName, (err, results)=>{
        if (err) throw err;
        console.log(results);
        if (results) return results;
    });

}

Code to call serverless fn:

fn().then(results => {
    console.log("then");
}).catch(err => {
    console.log("err received");
    console.log(err);
})

TextToSpeech class:

    const rp = require('requestretry');

class TextToSpeech {

    /**
     *
     * @param config - {key:"",endpoint:""}
     */
    constructor(config) {
        this.config = config;
        this.delayMS = 500;
        this.retry = 5;
    }

    // retry request if error or 429 received
    retryStrategy(err, response) {
        let shouldRetry = err || response.statusCode === 429;

        return shouldRetry;
    };

    // Gets an access token.
    async getAccessToken() {
        const options = {
            method: 'POST',
            uri: `https://${this.config.accessTokenHost}/sts/v1.0/issueToken`,
            headers: {
                'Ocp-Apim-Subscription-Key': this.config.ttsKey,
            },
        };
        const response = await rp(options);

        return response.body;
    };
    // Make sure to update User-Agent with the name of your resource.
    // You can also change the voice and output formats. See:
    // https://docs.microsoft.com/azure/cognitive-services/speech-service/language-support#text-to-speech
    /**
     *
     * @param accessToken - good for 10 minutes, used immediately
     * @param transformConfig - ttsConfigs
     * @param text
     * @param writableStream
     */
    async textToSpeech(accessToken, transformConfig, text, writableStream) {
        try {
            transformConfig.selectedVoice = {
                gender: 'female',
                locale: 'en-us',
                code: 'Jessa24KRUS',
            };

            // Create the SSML request.
            let body = `<?xml version="1.0"?><speak version="1.0" xml:lang="en-us"><voice xml:lang="en-us" name="Microsoft Server Speech Text to Speech Voice (${transformConfig.selectedVoice.locale}, ${transformConfig.selectedVoice.code})"><prosody rate="-20.00%">${text}</prosody></voice></speak>`;

            let options = {
                method: 'POST',
                baseUrl: `https://${this.config.ttsHost}/`,
                url: '/cognitiveservices/v1',
                headers: {
                    Authorization: 'Bearer ' + accessToken,
                    'cache-control': 'no-cache',
                    'User-Agent': 'YOUR_RESOURCE_NAME',
                    'X-Microsoft-OutputFormat': 'audio-24khz-48kbitrate-mono-mp3',
                    'Content-Type': 'application/ssml+xml',
                },
                //timeout: 120000,
                body: body,
                maxAttempts: this.retry,
                retryDelay: this.delayMS,
                retryStrategy: this.retryStrategy,
            };

            // request has binary audio file
            await rp(options)
                .on('response', async (response) => {
                    if (response.statusCode === 200) {
                        writableStream.on('finish', () => {
                            console.log('The end!');
                        });
                        response.pipe(writableStream);
                    } else {
                        throw Error('Response statusCode ' + response.statusCode);
                    }
                })
                .on('error', err => {
                    throw err;
                });
        } catch (err) {
            throw err;
        }
    }

    /**
     *
     * @param transformConfig
     * @param text
     */
    async transform(transformConfig, text, writableStream) {
        try {
            // get token - access token is good for 10 minutes
            const accessToken = await this.getAccessToken();

            // get binary and return in in/out writableStream
            await this.textToSpeech(accessToken, transformConfig, text, writableStream);
        } catch (err) {
            throw err;
        }
    }
}


module.exports = TextToSpeech;

Output from console

When run from a console:

then
BlobResult {
  container: 'function-blob-tts',
  name: 'diberry/test.mp3',
  metadata: {},
  lastModified: 'Sun, 25 Aug 2019 13:06:25 GMT',
  creationTime: 'Sun, 25 Aug 2019 12:38:50 GMT',
  etag: '"0x8D7295D08E34C0E"',
  blobType: 'BlockBlob',
  contentLength: '19008',
  serverEncrypted: 'true',
  requestId: 'caa7abc9-701e-00ff-0b47-5b694c000000',
  contentSettings:
   { contentType: 'application/octet-stream',
     contentMD5: 'FN99sCq5XC3DOnAucPHtCA==' },
  lease: { status: 'unlocked', state: 'available' } }
The end!
1

1 Answers

0
votes

Had to change the code to make the order work as I wanted.

The transform isn't necessary - I just wanted to see what was going on.

The pipe to writeable stream is inside of a promise so that holds the engine tick instead of continuing. The transform shows the processing of the stream as a synchronous operation.

The blob properties as a callback was also messing up the ticks so converted that into a promise - don't mix callbacks and promises.

Code for serverless function

require('dotenv').config();
const TextToSpeech = require("./textToSpeech");
const azure = require('azure-storage');

const getBlobProperties = async(blobService, container, blobName) => {
    return new Promise((resolve, reject) => {
        try {

            // blob properties
            blobService.getBlobProperties(container, blobName, (err, results)=>{
                if (err) throw err;
                console.log(`getBlobProperties - ${JSON.stringify(results)}`);
                if (results) {
                    console.log(`getBlobProperties - done`);
                    resolve(results);
                }
            });

        } catch (err) {
            reject(err);
        }
    });
}

const fn = async () => {

    try{

        const blobService = azure.createBlobService(process.env.STORAGECONNECTIONSTRING);

        const textToSpeech = new TextToSpeech({
            accessTokenHost: process.env.SPEECHACCESSTOKENHOST,
            ttsHost: process.env.SPEECHRESOURCETTSHOST,
            ttsKey: process.env.SPEECHRESOURCETTSKEY
        });

        const userName = "diberry";
        const container = "function-blob-tts";
        const directory = userName;
        const transformConfig = {"filenameandpath": '6-test.mp3'};

        const blobName = directory + "/" + transformConfig.filenameandpath;

        // DOCS: https://azure.github.io/azure-storage-node/BlobService.html#createWriteStreamToBlockBlob__anchor
        const writableStream = blobService.createWriteStreamToBlockBlob(container, blobName, { blockIdPrefix: 'block' });

        await textToSpeech.transform(transformConfig, "This is a brand new world.", writableStream);

        console.log(`N-2 textToSpeech.transform done`);

        await getBlobProperties(blobService, container, blobName);
        console.log(`N-1 blob properties done`);

    }catch(err){
        console.log(`function error - ${err}`);
    }

}

Code to call serverless fn:

fn().then(results => {
    console.log("N function done");
}).catch(err => {
    console.log("function err received");
    console.log(err);
})

TextToSpeech class:

const rp = require('requestretry');

class TextToSpeech {

    /**
     *
     * @param config - {key:"",endpoint:""}
     */
    constructor(config) {
        this.config = config;
        this.delayMS = 500;
        this.retry = 5;
    }

    // retry request if error or 429 received
    retryStrategy(err, response) {
        let shouldRetry = err || response.statusCode === 429;

        return shouldRetry;
    };

    // Gets an access token.
    async getAccessToken() {
        const options = {
            method: 'POST',
            uri: `https://${this.config.accessTokenHost}/sts/v1.0/issueToken`,
            headers: {
                'Ocp-Apim-Subscription-Key': this.config.ttsKey,
            },
        };
        const response = await rp(options);

        return response.body;
    };
    // Make sure to update User-Agent with the name of your resource.
    // You can also change the voice and output formats. See:
    // https://docs.microsoft.com/azure/cognitive-services/speech-service/language-support#text-to-speech
    /**
     *
     * @param accessToken - good for 10 minutes, used immediately
     * @param transformConfig - ttsConfigs
     * @param text
     * @param writableStream
     */
    async textToSpeech(accessToken, transformConfig, text, writableStream) {

        return new Promise((resolve, reject) => {
            try {
                transformConfig.selectedVoice = {
                    gender: 'female',
                    locale: 'en-us',
                    code: 'Jessa24KRUS',
                };

                // Create the SSML request.
                let body = `<?xml version="1.0"?><speak version="1.0" xml:lang="en-us"><voice xml:lang="en-us" name="Microsoft Server Speech Text to Speech Voice (${transformConfig.selectedVoice.locale}, ${transformConfig.selectedVoice.code})"><prosody rate="-20.00%">${text}</prosody></voice></speak>`;

                let options = {
                    method: 'POST',
                    baseUrl: `https://${this.config.ttsHost}/`,
                    url: '/cognitiveservices/v1',
                    headers: {
                        Authorization: 'Bearer ' + accessToken,
                        'cache-control': 'no-cache',
                        'User-Agent': 'YOUR_RESOURCE_NAME',
                        'X-Microsoft-OutputFormat': 'audio-24khz-48kbitrate-mono-mp3',
                        'Content-Type': 'application/ssml+xml',
                    },
                    //timeout: 120000,
                    body: body,
                    maxAttempts: this.retry,
                    retryDelay: this.delayMS,
                    retryStrategy: this.retryStrategy,
                };

                const { Transform } = require('stream');

                const reportProgress = new Transform({
                transform(chunk, encoding, callback) {
                    process.stdout.write('.');
                    callback(null, chunk);
                }
                });

                // request has binary audio file
                rp(options)
                .pipe(reportProgress)
                .pipe(writableStream)
                .on('finish', () => {
                    console.log('Done');
                    resolve();
                });



            } catch (err) {
                reject(err);
            }
        });
    }


    /**
     *
     * @param transformConfig
     * @param text
     */
    async transform(transformConfig, text, writableStream) {
        try {
            // get token - access token is good for 10 minutes
            const accessToken = await this.getAccessToken();

            // get binary and return in in/out writableStream
            await this.textToSpeech(accessToken, transformConfig, text, writableStream);

            console.log("transform done");
        } catch (err) {
            console.log(`transform error - ${err}`);
            throw err;
        }
    }
}


module.exports = TextToSpeech;