1
votes

I have raw streams from 3 mysql tables, 1 primary and two child table. I tried to join three raw streams and transformed into single output stream. It works if there is any update on parent stream but not triggering output if anything changes on child stream.

    @StreamListener
    public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
    @Input KStream<Long, Child1> child1Stream,
    @Input KStream<Long, Child2> child2Stream) {

    KTable<Long, Parent> parentTable = convertParent(parentStream);
    KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
    KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);

    parentTable
           .leftJoin(child1Table, (parent, child1List) -> new Output(k, v))
           .leftJoin(child2Table, (output, child2List) -> output.setChild2List(child2List))
           .toStream()
        }

Any new add or update on parent stream is picked up the processor and joins it with other KTable and return it on output stream. But any add or update on child1stream or child2stream doesn't trigger an output stream.

I thought making all input streams as KTable, they will always store changes as all of them have same key and any update on parent or child tables will be picked up the joins. But it is not happening, can anyone suggest what I am missing in this ?

I already tried KStream-KStream, Stream-KTable, KTable-KTable joins, none of them worked in case of child updates.

-Thanks

2

2 Answers

2
votes

Can you show where you have the EnableBinding and the processor interface that you are binding to?

This doesn't look right to me:

@StreamListener
    public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
    @Input KStream<Long, Child1> child1Stream,
    @Input KStream<Long, Child2> child2Stream) {

You are not specifying a binding on the inputs. You need to have something like this when you have multiple inputs:

@StreamListener
        public Stream<Long, Output> handleStreams(@Input("input1") KStream<Long, Parent> parentStream,
        @Input("input2") KStream<Long, Child1> child1Stream,
        @Input("input3") KStream<Long, Child2> child2Stream) {

Each of those inputs needs to be defined in the processor interface. See here for an example: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kafka-streams-samples/kafka-streams-table-join/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java#L46

0
votes

Notice how your child tables are created from the same stream as the parentTable:

KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);

Not sure what convertChild1 and convertChild2 methods do, but shouldn't they be given child1Stream and child2Stream as argument, respectively?