1
votes

I'm relatively new to Flink and today I encountered a problem while using Flink SQL on Flink 1.11.3 session cluster.

Problem

I registered a source table which uses jdbc postgres driver. I am trying to move some data from this online DB to AWS S3 in parquet format. This table is huge in size (~43 GB). The job failed after around 1 minute, and the task manager crashed without any warning. But my best guess is task manager ran out of memory.

My Observation

I found that when I do tableEnv.executeSql("select ... from huge_table limit 1000") flink attempted to scan the entire source table into memory and only after that planned to do the limit.

Question

Since I only care about the most recent several days of data, is there any way to limit how many rows a job would scan by timestamp?

Appendix

Here is a minimal setup that can reproduce the issue (lots of noise removed)

Env setup code

var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);

Source table DDL in Flink SQL

CREATE TABLE source_transactions (
    txid STRING,
    username STRING,
    amount BIGINT,
    ts TIMESTAMP,
    PRIMARY KEY (txid) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url'='jdbc:postgresql://my.bank',
    'table-name'='transactions',
    'driver'='org.postgresql.Driver',
    'username'='username',
    'password'='password',
    'scan.fetch-size'='2000'
)

Sink table DDL in Flink SQL

CREATE TABLE sink_transactions (
    create_time TIMESTAMP,
    username STRING,
    delta_amount DOUBLE,
    dt STRING
) PARTITIONED BY (dt) WITH (
    'connector'='filesystem',
    'path'='s3a://s3/path/to/transactions',
    'format'='parquet'
)

Insert query in Flink SQL

INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions
1

1 Answers

1
votes

Your observation is right,Flink doesn't support limit pushdown optimization for JDBC connector, and there's an nearly merged PR to support this feature, this will be used in Flink 1.13 and you can cherry-pick this patch to your code if you're urgent to this feature.

1.JIRA: FLINK-19650 Support the limit push down for the Jdbc

2.PR: https://github.com/apache/flink/pull/13800