0
votes

I've got a PostgreSQL table with several millions of rows that need to be processed with the same algorithm. I am using Python and SQLAlchemy.Core for this task.

This algorithm accepts one or several rows as input and returns the same amount of rows with some updated values.

id1, id2, NULL, NULL, NULL -> id1, id2, value1, value2, value3
id1, id3, NULL, NULL, NULL -> id1, id3, value4, value5, value6
id2, id3, NULL, NULL, NULL -> id2, id3, value7, value8, value9
...
id_n, id_m, NULL, NULL, NULL -> id_n, id_m, value_xxx, value_yyy, value_zzz

I am using a PC cluster to perform this task. This cluster runs dask.distributed scheduler and workers.

I think, that this task can be effectively implemented with the map function. My idea is that each worker queries data base, selects for processing some rows with NULL values, then updates them with results.

My question is: how to write the SQL query, that would allow to distribute pieces of the table among workers?

I've tried to define subsets of row for each worker with offset and limit in SQL queries, that each worker emits:

SQL:

select * from table where value1 is NULL offset N limit 100;
...
update table where id1 = ... and id2 = ...
       set value1 = value...; 

Python:

from sqlalchemy import create_engine, bindparam, select, func
from distributed import Executor, progress

def process(offset, limit):
    engine = create_engine(...)

    # get next piece of work
    query = select(...).where(...).limit(limit).offset(offset)

    rows = engine.execute([select]).fetchall()
    # process rows

    # submit values to table
    update_stmt = table.update().where(...).where(...).values(...) 
    up_values = ...
    engine.execute(update_stmt, up_values) 

if __name__ == '__main__':
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
                                       port=config('SERVER_PORT')))
    n_rows = count_rows_to_process()
    chunk_size = 100
    progress(e.map(process, range(0, n_rows, chunk_size)))

However, this didn't work.

The range function has returned list of offsets before calculations have started, and the map function has distributed them among workers before starting process function.

Then some workers have successfully finished processing their chunks of work, submitted their results to the table, and updated values.

Then new iteration begins, new SELECT ...WHERE value1 is NULL LIMIT 100 OFFSET ... query is sent to the data base, but the offset is now invalid, because it was calculated before the previous workers have updated the table. Amount of NULL values is now reduced, and a worker can receive empty set from the database.

I cannot use one SELECT query before starting calculations, because it will return huge table that doesn't fit in RAM.

SQLAlchemy manual also says that for distributed processing the engine instance should be created locally for each python process. Therefore, I cannot query the database once and send returned cursor to the process function.

Therefore, the solution is correct construction of SQL queries.

1
Are you sure that your algorithm will benefit from distributed processing? Is it significantly CPU-bound? Very often you can fit a significant portion (if not all) of the dataset in memory (even on the order of millions of rows) which ends up being faster if you use a single process than if you distributed it due to overhead. Regardless, instead of doing LIMIT and OFFSET (which is slow because it has to skip OFFSET rows), ORDER BY the primary key (assuming it's (id1, id2)) and do (id1, id2) BETWEEN (1, 2) AND (3, 4). - univerio
I don't see any alternative to distributed processing for my case. The algorithm is implemented in C and compiled in binary executables for PC and ARM, that are called from python. - wl2776

1 Answers

1
votes

One option to consider is randomization:

SELECT *
FROM table
WHERE value1 IS NULL
ORDER BY random()
LIMIT 100;

In worst case scenario you will have several workers calculating the same thing in parallel. If it does not bother you this is one of the most simple ways.

The other option is dedicating individual rows to the particular worker:

UPDATE table 
SET value1 = -9999 
WHERE id IN (
    SELECT id 
    FROM table 
    WHERE value1 IS NULL 
    ORDER BY random() 
    LIMIT 100
) RETURNING * ;

This way you "mark" the rows your particular worker has "taken" with -9999. All other workers will skip these rows as value1 IS NOT NULL any more. The risk here is that if the worker fails you will not have a simple way to get back to these rows - you would have to manually update them back to NULL.