0
votes

I'm using Flink SQL to read debezium avro data from Kafka and store as parquet files in S3. Here is my code,

import os

from pyflink.datastream import StreamExecutionEnvironment, FsStateBackend
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, StreamTableEnvironment, \
    ScalarFunction

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
# start a checkpoint every 12 s
exec_env.enable_checkpointing(12000)
t_config = TableConfig()
t_env = StreamTableEnvironment.create(exec_env, t_config)

INPUT_TABLE = 'source'
KAFKA_TOPIC = os.environ['KAFKA_TOPIC']
KAFKA_BOOTSTRAP_SERVER = os.environ['KAFKA_BOOTSTRAP_SERVER']
OUTPUT_TABLE = 'sink'
S3_BUCKET = os.environ['S3_BUCKET']
OUTPUT_S3_LOCATION = os.environ['OUTPUT_S3_LOCATION']

ddl_source = f"""
       CREATE TABLE {INPUT_TABLE} (
           `event_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
           `id` BIGINT,
           `price` DOUBLE,
           `type` INT,
           `is_reinvite` INT
       ) WITH (
           'connector' = 'kafka',
           'topic' = '{KAFKA_TOPIC}',
           'properties.bootstrap.servers' = '{KAFKA_BOOTSTRAP_SERVER}',
           'scan.startup.mode' = 'earliest-offset',
           'format' = 'debezium-avro-confluent',
           'debezium-avro-confluent.schema-registry.url' = 'http://kafka-production-schema-registry:8081'
       )
   """

ddl_sink = f"""
       CREATE TABLE {OUTPUT_TABLE} (
           `event_time` TIMESTAMP,
           `id` BIGINT,
           `price` DOUBLE,
           `type` INT,
           `is_reinvite` INT
       ) WITH (
            'connector' = 'filesystem',
            'path' = 's3://{S3_BUCKET}/{OUTPUT_S3_LOCATION}',
            'format' = 'parquet'
       )
   """

t_env.sql_update(ddl_source)
t_env.sql_update(ddl_sink)

t_env.execute_sql(f"""
    INSERT INTO {OUTPUT_TABLE}
    SELECT * 
    FROM {INPUT_TABLE}
""")

When I submit the job, I get the following error message,

pyflink.util.exceptions.TableException: Table sink 'default_catalog.default_database.sink' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, source]], fields=[id, price, type, is_reinvite, timestamp])

I'm using Flink 1.12.1. The source is working properly and I have tested it using a 'print' connector in the sink. Here is a sample data set which was extracted from the task manager logs when using 'print' connector in the table sink,

-D(2021-02-20T17:07:27.298,14091764,26.0,9,0)
-D(2021-02-20T17:07:27.298,14099765,26.0,9,0)
-D(2021-02-20T17:07:27.299,14189806,16.0,9,0)
-D(2021-02-20T17:07:27.299,14189838,37.0,9,0)
-D(2021-02-20T17:07:27.299,14089840,26.0,9,0)
-D(2021-02-20T17:07:27.299,14089847,26.0,9,0)
-D(2021-02-20T17:07:27.300,14189859,26.0,9,0)
-D(2021-02-20T17:07:27.301,14091808,37.0,9,0)
-D(2021-02-20T17:07:27.301,14089911,37.0,9,0)
-D(2021-02-20T17:07:27.301,14099937,26.0,9,0)
-D(2021-02-20T17:07:27.302,14091851,37.0,9,0)

How can I make my table sink work with the filesystem connector ?

1

1 Answers

1
votes

What happens is that:

  • when receiving the Debezium records, Flink updates a logical table by adding, removing and suppressing Flink rows based on their primary key.
  • the only sinks that can handle that kind of information are those that have a concept of update by key. Jdbc would be a typical example, in which case it's straightforward for Flink to translate the concept of "a Flink row with key foo has been updated to bar" into "JDBC row with key foo should be updated with value bar", or something. filesystem sink do not support that kind of operation since files are append-only.

See also Flink documentation on append and update queries

In practice, in order to do the conversion, we first have to decide what is it exactly we want to have in this append-only file.

If what we want is to have in the file the latest version of each item any time an id is updated, then to my knowledge the way to go would be to convert it to a stream first, and then output that with a FileSink. Note that in that case, the result contains a boolean saying if the row is updated or deleted, and we have to decide how we want this information to be visible in the resulting file.

Note: I used this other CDC example from the Flink SQL cookbook to reproduce a similar setup:


// assuming a Flink retract table of claims build from a CDC stream:
tableEnv.executeSql("" +
        " CREATE TABLE accident_claims (\n" +
        "    claim_id INT,\n" +
        "    claim_total FLOAT,\n" +
        "    claim_total_receipt VARCHAR(50),\n" +
        "    claim_currency VARCHAR(3),\n" +
        "    member_id INT,\n" +
        "    accident_date VARCHAR(20),\n" +
        "    accident_type VARCHAR(20),\n" +
        "    accident_detail VARCHAR(20),\n" +
        "    claim_date VARCHAR(20),\n" +
        "    claim_status VARCHAR(10),\n" +
        "    ts_created VARCHAR(20),\n" +
        "    ts_updated VARCHAR(20)" +
        ") WITH (\n" +
        "  'connector' = 'postgres-cdc',\n" +
        "  'hostname' = 'localhost',\n" +
        "  'port' = '5432',\n" +
        "  'username' = 'postgres',\n" +
        "  'password' = 'postgres',\n" +
        "  'database-name' = 'postgres',\n" +
        "  'schema-name' = 'claims',\n" +
        "  'table-name' = 'accident_claims'\n" +
        " )"
);

// convert it to a stream
Table accidentClaims = tableEnv.from("accident_claims");
DataStream<Tuple2<Boolean, Row>> accidentClaimsStream = tableEnv
    .toRetractStream(accidentClaims, Row.class);

// and write to file
final FileSink<Tuple2<Boolean, Row>> sink = FileSink
    // TODO: adapt the output format here:
    .forRowFormat(new Path("/tmp/flink-demo"),
                        (Encoder<Tuple2<Boolean, Row>>) (element, stream) -> stream.write((element.toString() + "\n").getBytes(StandardCharsets.UTF_8)))
    .build();
ordersStreams.sinkTo(sink);

streamEnv.execute();

Note that during the conversion, you obtain a boolean telling you whether that row is a new value for that accident claim, or a deletion of such claim. My basic FileSink config there is just including that boolean in the output, although how to handle deletions is to be decided case by case.

The result in the file then looks like this:

head /tmp/flink-demo/2021-03-09--09/.part-c7cdb74e-893c-4b0e-8f69-1e8f02505199-0.inprogress.f0f7263e-ec24-4474-b953-4d8ef4641998

(true,1,4153.92,null,AUD,412,2020-06-18 18:49:19,Permanent Injury,Saltwater Crocodile,2020-06-06 03:42:25,IN REVIEW,2021-03-09 06:39:28,2021-03-09 06:39:28)
(true,2,8940.53,IpsumPrimis.tiff,AUD,323,2019-03-18 15:48:16,Collision,Blue Ringed Octopus,2020-05-26 14:59:19,IN REVIEW,2021-03-09 06:39:28,2021-03-09 06:39:28)
(true,3,9406.86,null,USD,39,2019-04-28 21:15:09,Death,Great White Shark,2020-03-06 11:20:54,INITIAL,2021-03-09 06:39:28,2021-03-09 06:39:28)
(true,4,3997.9,null,AUD,315,2019-10-26 21:24:04,Permanent Injury,Saltwater Crocodile,2020-06-25 20:43:32,IN REVIEW,2021-03-09 06:39:28,2021-03-09 06:39:28)
(true,5,2647.35,null,AUD,74,2019-12-07 04:21:37,Light Injury,Cassowary,2020-07-30 10:28:53,REIMBURSED,2021-03-09 06:39:28,2021-03-09 06:39:28)