I've got a PostgreSQL table with several millions of rows that need to be processed with the same algorithm. I am using Python and SQLAlchemy.Core for this task.
This algorithm accepts one or several rows as input and returns the same amount of rows with some updated values.
id1, id2, NULL, NULL, NULL -> id1, id2, value1, value2, value3
id1, id3, NULL, NULL, NULL -> id1, id3, value4, value5, value6
id2, id3, NULL, NULL, NULL -> id2, id3, value7, value8, value9
...
id_n, id_m, NULL, NULL, NULL -> id_n, id_m, value_xxx, value_yyy, value_zzz
I am using a PC cluster to perform this task. This cluster runs dask.distributed scheduler and workers.
I think, that this task can be effectively implemented with the map function. My idea is that each worker queries data base, selects for processing some rows with NULL values, then updates them with results.
My question is: how to write the SQL query, that would allow to distribute pieces of the table among workers?
I've tried to define subsets of row for each worker with offset and limit in SQL queries, that each worker emits:
SQL:
select * from table where value1 is NULL offset N limit 100;
...
update table where id1 = ... and id2 = ...
set value1 = value...;
Python:
from sqlalchemy import create_engine, bindparam, select, func
from distributed import Executor, progress
def process(offset, limit):
engine = create_engine(...)
# get next piece of work
query = select(...).where(...).limit(limit).offset(offset)
rows = engine.execute([select]).fetchall()
# process rows
# submit values to table
update_stmt = table.update().where(...).where(...).values(...)
up_values = ...
engine.execute(update_stmt, up_values)
if __name__ == '__main__':
e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
port=config('SERVER_PORT')))
n_rows = count_rows_to_process()
chunk_size = 100
progress(e.map(process, range(0, n_rows, chunk_size)))
However, this didn't work.
The range function has returned list of offsets before calculations have started, and the map function has distributed them among workers before starting process function.
Then some workers have successfully finished processing their chunks of work, submitted their results to the table, and updated values.
Then new iteration begins, new SELECT ...WHERE value1 is NULL LIMIT 100 OFFSET ... query is sent to the data base, but the offset is now invalid, because it was calculated before the previous workers have updated the table. Amount of NULL values is now reduced, and a worker can receive empty set from the database.
I cannot use one SELECT query before starting calculations, because it will return huge table that doesn't fit in RAM.
SQLAlchemy manual also says that for distributed processing the engine instance should be created locally for each python process. Therefore, I cannot query the database once and send returned cursor to the process function.
Therefore, the solution is correct construction of SQL queries.
LIMITandOFFSET(which is slow because it has to skipOFFSETrows),ORDER BYthe primary key (assuming it's(id1, id2)) and do(id1, id2) BETWEEN (1, 2) AND (3, 4). - univerio