I'm relatively new to Flink and today I encountered a problem while using Flink SQL on Flink 1.11.3 session cluster.
Problem
I registered a source table which uses jdbc postgres driver. I am trying to move some data from this online DB to AWS S3 in parquet format. This table is huge in size (~43 GB). The job failed after around 1 minute, and the task manager crashed without any warning. But my best guess is task manager ran out of memory.
My Observation
I found that when I do tableEnv.executeSql("select ... from huge_table limit 1000")
flink attempted to scan the entire source table into memory and only after that planned to do the limit.
Question
Since I only care about the most recent several days of data, is there any way to limit how many rows a job would scan by timestamp?
Appendix
Here is a minimal setup that can reproduce the issue (lots of noise removed)
Env setup code
var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);
Source table DDL in Flink SQL
CREATE TABLE source_transactions (
txid STRING,
username STRING,
amount BIGINT,
ts TIMESTAMP,
PRIMARY KEY (txid) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:postgresql://my.bank',
'table-name'='transactions',
'driver'='org.postgresql.Driver',
'username'='username',
'password'='password',
'scan.fetch-size'='2000'
)
Sink table DDL in Flink SQL
CREATE TABLE sink_transactions (
create_time TIMESTAMP,
username STRING,
delta_amount DOUBLE,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector'='filesystem',
'path'='s3a://s3/path/to/transactions',
'format'='parquet'
)
Insert query in Flink SQL
INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions