0
votes

I am trying to aggregate data in Elasticsearch using Kafka messages (as a Flink 1.10 API StreamSource). Data is receiving in JSON format which is dynamic and sample is given below.I want to combine multiple records in single document by unique ID. Data is coming in sequence wise and it's time series data.

source sink kafka and destination sink elasticseach 7.6.1 6

I am not found any good example which can be utilize in below problem statement.

Record : 1
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.325",
"Data" : 
{
 "Field1" : "ABC",
 "Field2" : "DEF"
}
}

Record : 2
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.725",
"Data" : 
{
 "Field3" : "GHY"
}
}

Result :

{
"ID" : "1",
"Start_timestamp" : "2020-05-07 14:34:51.325",
"End_timestamp" : "2020-05-07 14:34:51.725",
"Data" :
{
 "Field1" : "ABC",
 "Field2" : "DEF",
 "Field3" : "GHY"
}
}

Below is version details:

  1. Flink 1.10
  2. Flink-kafka-connector 2.11
  3. Flink-Elasticsearch-connector 7.x
  4. Kafka 2.11
  5. JDK 1.8
1

1 Answers

0
votes

What you're asking for could be described as some sort of join, and there are many ways you might accomplish this with Flink. There's an example of stateful enrichment in the Apache Flink Training that shows how to implement a similar join using a RichFlatMapFunction that should help you get started. You'll want to read through the relevant training materials first -- at least the section on Data Pipelines & ETL.

What you'll end up doing with this approach is to partition the stream by ID (via keyBy), and then use key-partitioned state (probably MapState in this case, assuming you have several attribute/value pairs to store for each ID) to store information from records like record 1 until you're ready to emit a result.

BTW, if the set of keys is unbounded, you'll need to take care that you don't keep this state forever. Either clear the state when it's no longer needed (as this example does), or use State TTL to arrange for its eventual deletion.

For more information on other kinds of joins in Flink, see the links in this answer.