0
votes

I'm using Flink 1.11 (via the Python API and an Anaconda virtual environment) with Kafka as both my source and sink. I'm submitting my Flink jobs to a cluster. All are being ran on Docker (locally).

As I'm new to it, I have it set up now that it's essentially working as pass-through using some windowing and slowly build it up.

My setup so far:

if __name__ == "__main__":

  env = StreamExecutionEnvironment.get_execution_environment()
  env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
  table_config = TableConfig()
  table_env = StreamTableEnvironment.create(env, table_config)

  ## Setup environment
  # Use our previously configured Anaconda environment
  table_env.add_python_archive("venv.zip")
  table_env.get_config().set_python_executable("venv.zip/venv/bin/python")

  shared_fields = {'a': DataTypes.STRING(), 'b': DataTypes.STRING(), 'c': DataTypes.STRING()}

  source_data_topic = "eddn_topic"

  table_env.connect( 
      Kafka()
      .version("0.11")
      .topic("test_sink")
      .property("bootstrap.servers", bootstrap_host)
    ) \
    .with_format(
      Json()
      .fail_on_missing_field(False)
    ) \
    .with_schema(
        Schema()
        .fields(shared_fields)
    ) \
    .create_temporary_table("stream_sink") \

  source_ddl = f"""
          CREATE TABLE testSource(
              a STRING,
              b STRING,
              c STRING,
              `timestamp` TIMESTAMP(3),
              WATERMARK FOR `timestamp` AS `timestamp`
          ) with (
              'connector' = 'kafka-0.11',
              'properties.bootstrap.servers' = '{bootstrap_host}',
              'topic' = 'test_source',
              'properties.group.id' = 'testGroup',
              'format' = 'json',
              'scan.startup.mode' = 'latest-offset',
              'json.fail-on-missing-field' = 'false',
              'json.timestamp-format.standard' = 'ISO-8601',
              'json.ignore-parse-errors' = 'false'
          )
          """

  table_env.execute_sql(source_ddl)

  # Setup a 10-second Tumbling window
  table = table_env.from_path("testSource") \
            .select("a, b, c, timestamp") \
            .window(Tumble.over("10.second").on("timestamp").alias("testWindow")) \
            .group_by("testWindow, a, b, c") \
            .select("*")

Yes, I'm mixing execute_sql() and connect() to setup my tables, but that's for my learning purposes.

From here, this works fine and messages appear in the new Kafka topic:

  table.insert_into("stream_sink") 
  table_env.execute("TestEnrichmentJob")

However, even doing a conversion to a Pandas DataFrame and back doesn't produce messages:

  pandasTable = table.to_pandas()
  enriched_table = table_env.from_pandas(pandasTable, [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()])
  enriched_table.insert_into("stream_sink") 

  table_env.execute("TestEnrichmentJob")

Watching the job in the Flink web interface shows that this sink task is receiving data but not sending any (nor does the job fail, just continues to run). Kafka shows that messages are being consumed from the source topic but not being produced in the sink topic.

I feel like I'm missing something obvious, being new to streaming data.

  1. Am I missing something?
  2. Once I need to do more advanced operations, do I need to implement it as a Python UDFs? Or can it just be written as "normal" Pandas operations?
1
FWIW, a tutorial on this is being prepared: github.com/apache/flink/pull/13273/files. Hopefully this will get merged soon and become available in the master docs.David Anderson

1 Answers

1
votes

One thing you are missing: Flink sources never show any records coming in, and Flink sinks never show any records going out. In other words, the numRecordsIn, numRecordsOut, numRecordsInPerSecond, and numRecordsOutPerSecond metrics shown in the Flink web UI only measure traffic within Flink, and ignore communication with external systems such as Kafka.

Edit:

I haven't tried it myself, but a forthcoming tutorial has an example showing this being done as:

enriched_table.execute_insert("stream_sink").get_job_client().get_job_execution_result().result()