1
votes

Earlier I asked if Flink could create something from nothing and the answer is yes. Now I am looking more into the capabilities of Flink SQL specifically.

In SQL this type of challenge is sometimes easy (e.g. SELECT 1 works in regular engines such as MySQL) but also it sometimes is impossible, for example Apache Pig cannot create something from nothing.

I am not sure about Flink SQL, the idea is of course that with the ability to create something from nothing, it eliminates any hard dependencies on other solutions when you want to run a quick test or build a portable example.

For sake of simplicity: Assume I want to generate at least 1 message per second, and don't mind what is inside.


My first thoughts:

  • A simple select statement will not do the trick, as you do not get any output when there is nothing to select.
  • Conceptually a count(*) over a time window could work, but I did not get it working yet. Perhaps it is a feature that if there is nothing to count the output will be nothing (rather than 0).

Other than windowing I do not see anything in Flink SQL that has the concept of time, so outside that I suspect it will not be possible.

What am I Not looking for:

  • Starting with one message and making more with it. Suppose I have a kafka topic and a message enters, then it is likely trivial to keep looping it back and creating infinity messages. But my question is more about how to start when kafka is empty.
  • Flink code that is not SQL, or even other tooling.
2

2 Answers

4
votes

Even better than the datagen connector is flink-faker, which has been used is many of the examples in the Flink SQL Cookbook. I think you'll find these examples especially interesting.

Flink SQL has powerful support for working with both event time and system time, including watermarking. For example, Flink SQL uses watermarks to determine which records can be expired from Flink state when handling streaming event time windows and interval joins.

For some additional Flink SQL operations that leverage time, see time versioned tables, pattern detection, temporal joins, and time-based lookup joins.

1
votes

You might find the “datagen” connector useful for this.

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/datagen.html

For example:

CREATE TABLE Orders (
order_number BIGINT,
price        DECIMAL(32,2),
buyer        ROW<first_name STRING, last_name STRING>,
order_time   TIMESTAMP(3)
) WITH (
  'connector' = 'datagen'
)