1
votes

I have a large file, with a JSON record on each line. I'm writing a script to upload a subset of these records to CouchDB via the API, and experimenting with different approaches to see what works the fastest. Here's what I've found to work fastest to slowest (on a CouchDB instance on my localhost):

  1. Read each needed record into memory. After all records are in memory, generate an upload coroutine for each record, and gather/run all the coroutines at once

  2. Synchronously read file and when a needed record is encountered, synchronously upload

  3. Use aiofiles to read the file, and when a needed record is encountered, asynchronously update

Approach #1 is much faster than the other two (about twice as fast). I am confused why approach #2 is faster than #3, especially in contrast to this example here, which takes half as much time to run asynchronously than synchronously (sync code not provided, had to rewrite it myself). Is it the context switching from file i/o to HTTP i/o, especially with file reads ocurring much more often than API uploads?

For additional illustration, here's some Python pseudo-code that represents each approach:

Approach 1 - Sync File IO, Async HTTP IO

import json
import asyncio
import aiohttp

records = []
with open('records.txt', 'r') as record_file:
    for line in record_file:
        record = json.loads(line)
        if valid(record):
            records.append(record)

async def batch_upload(records):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for record in records:
            task = async_upload(record, session)
            tasks.append(task)  
        await asyncio.gather(*tasks)

asyncio.run(batch_upload(properties))

Approach 2 - Sync File IO, Sync HTTP IO

import json

with open('records.txt', 'r') as record_file:
    for line in record_file:
        record = json.loads(line)
        if valid(record):
            sync_upload(record)

Approach 3 - Async File IO, Async HTTP IO

import json
import asyncio
import aiohttp
import aiofiles

async def batch_upload()
    async with aiohttp.ClientSession() as session:
        async with open('records.txt', 'r') as record_file:
            line = await record_file.readline()
            while line:
                record = json.loads(line)
                if valid(record):
                    await async_upload(record, session)
                line = await record_file.readline()

asyncio.run(batch_upload())

The file I'm developing this with is about 1.3 GB, with 100000 records total, 691 of which I upload. Each upload begins with a GET request to see if the record already exists in CouchDB. If it does, then a PUT is performed to update the CouchDB record with any new information; if it doesn't, then a the record is POSTed to the db. So, each upload consists of two API requests. For dev purposes, I'm only creating records, so I run the GET and POST requests, 1382 API calls total.

Approach #1 takes about 17 seconds, approach #2 takes about 33 seconds, and approach #3 takes about 42 seconds.

1
could you add some timings? how many http requests are you performing? I know they're dependant on your system/network, but useful for figuring out your use caseSam Mason
i updated the post with timings and request dataJames Kelleher

1 Answers

1
votes

your code uses async but it does the work synchronously and in this case it will be slower than the sync approach. Asyc won't speed up the execution if not constructed/used effectively.

You can create 2 coroutines and make them run in parallel.. perhaps that speeds up the operation.

Example:

#!/usr/bin/env python3

import asyncio


async def upload(event, queue):
    # This logic is not so correct when it comes to shutdown,
    # but gives the idea
    while not event.is_set():
        record = await queue.get()
        print(f'uploading record : {record}')
    return


async def read(event, queue):
    # dummy logic : instead read here and populate the queue.
    for i in range(1, 10):
        await queue.put(i)
    # Initiate shutdown..
    event.set()


async def main():
    event = asyncio.Event()
    queue = asyncio.Queue()

    uploader = asyncio.create_task(upload(event, queue))
    reader = asyncio.create_task(read(event, queue))
    tasks = [uploader, reader]

    await asyncio.gather(*tasks)


if __name__ == '__main__':
    asyncio.run(main())