I have an architecture question regarding the union of more than two streams in Apache Flink.
We are having three and sometime more streams that are some kind of code book with whom we have to enrich main stream. Code book streams are compacted Kafka topics. Code books are something that doesn't change so often, eg currency. Main stream is a fast event stream. Our goal is to enrich main stream with code books.
There are three possible ways as I see it to do it:
- Make a union of all code books and then join it with main stream and store the enrichment data as managed, keyed state (so when compact events from kafka expire I have the codebooks saved in state). This is now only way that I tired to do it. Deserilized Kafka topic messages which are in JSON to POJOs eg. Currency, OrganizationUnit and so on. I made one big wrapper class CodebookData with all code books eg:
public class CodebookData {
private Currency currency;
private OrganizationUnit organizationUnit
...
}
Next I mapped incoming stream of every kafka topic to this wrapper class and then made a union:
DataStream<CodebookData> enrichedStream = mappedCurrency.union(mappedOrgUnit).union(mappedCustomer);
When I print CodebookData it is populated like this
CodebookData{
Currency{populated with data},
OrganizationUnit=null,
Customer=null
}
CodebookData{
Curenncy=null,
OrganizationUnit={populated with data},
Customer=null
}
...
Here I stopped because I have problem how to connect this Codebook stream with main stream and save codebook data in value state. I do not have unique foreign key in my Codebook data because every codebook has its own foregin key that connects with main stream, eg. Currency has currencyId, organizationUnit orgID and so on. Eg.I want to do something like this
SingleOutputStreamOperator<CanonicalMessage> enrichedMainStream = mainStream
.connect(enrichedStream)
.keyBy(?????)
.process(new MyKeyedCoProcessFunction());
and in MyCoProcessFunction I would create ValueState of type CodebookData.
Is this totally wrong or can I do something with this and if it is douable what I am doing wrong?
Second approach is by cascading a series of two-input CoProcessFunction operators with every kafka event source but I read somewhere that this is not optimal approach.
Third approach is broadcast state that is not so much familiar to me. For now I see the problem if I am using RocksDb for checkpointing and savepointing I am not sure that I can then use broadcast state.
Should I use some other approach from approach no.1 whit whom I am currently struggling?