0
votes

I am reading at https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/.

It says that:

As a sink, the upsert-kafka connector can consume a changelog stream. It will write INSERT/UPDATE_AFTER data as normal Kafka messages value, and write DELETE data as Kafka messages with null values (indicate tombstone for the key).

It doesn't mention that if UPDATE_BEFORE message is written to upsert kafka,then what would happen?

In the same link (https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/#full-example), the doc provides a full example:

INSERT INTO pageviews_per_region
SELECT
  user_region,
  COUNT(*),
  COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

With the above INSERT/SELECT operation, INSERT/UPDATE_BEFORE/UPDATE_AFTER messages will be generated and will go to the upsert kafka sink, I would ask what happens when upsert kafka meets the UPDATE_BEFORE message.

1

1 Answers

1
votes

From the comments on the source code

        / /   partial code
        // During the Upsert mode during the serialization process, if the operation is executed is Rowkind.delete or Rowkind.Update_before                 
        // set it to NULL (corresponding to Kafka tomb news)

https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=165221669#content/view/165221669

Upsert-kafka sink doesn’t require planner to send UPDATE_BEFORE messages (planner may still send UPDATE_BEFORE messages in some cases), and will write INSERT/UPDATE_AFTER messages as normal Kafka records with key parts, and will write DELETE messages as Kafka records with null values (indicate tombstone for the key). Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns.

Upsert-kafka source is a kind of changelog source. The primary key semantics on changelog source means the materialized changelogs (INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) are unique on the primary key constraints. Flink assumes all messages are in order on the primary key.

Implementation Details Due to the upsert-kafka connector only produces upsert stream which doesn’t contain UPDATE_BEFORE messages. However, several operations require the UPDATE_BEFORE messages for correctly processing, e.g. aggregations. Therefore, we need to have a physical node to materialize the upsert stream and generate changelog stream with full change messages. In the physical operator, we will use state to know whether the key is the first time to be seen. The operator will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for the previous image, or produce DELETE rows with all columns filled with values.