0
votes

I have created to KStreams that I want to join them together. The output of the two streams are the following:

Stream 1:

2    {"CODE":"AAAA96","STATUS":"SUBMITTED","ID":2}

Stream 2:

26   {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100926","ID":26}

I want to create a joined stream (inner join) of this two Streams, so I have created the following KStream:

KStream<String, String> s_joined = s_order
        .join(s_order_item, (left,right) -> left + right,
                JoinWindows.of(Duration.ofSeconds(30)))
        .mapValues(value -> {
            String[] arrOfstr = value.split("(?<=})");
            JSONObject jl = new JSONObject(arrOfstr[0]);
            JSONObject jr = new JSONObject(arrOfstr[1]);
            JSONObject json = new JSONObject();
            Iterator<String> keys = jl.keys();
            while(keys.hasNext()) {
                String key = keys.next();
                json.put(key, jl.get(key));
            }
            keys = jr.keys();
            while(keys.hasNext()) {
                String key = keys.next();
                json.put(key, jr.get(key));
            }
            return json.toString();
        });

In this KStream I just use a join and I am changing the format of the output message, nothing more.

Through one example I will explain what I want to do:

The following messages are published inside the window:

Stream 1

9 {"CODE":"AAAA98","STATUS":"CANCELED","ID":"9"}

Stream 2

9 {"DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

Joined Stream

What is published

9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

What I want to be published

9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}

In conclusion, I want to publish just the latest message inside the window, not all of them. Is this possible?

2

2 Answers

0
votes

You could use groupByKey function which returns KGroupedStream and then transform it in a required way with map/reduce functions for that. Please, see Kafka Streams DSL for more info.

0
votes

I found the answer. The way to achive what I want to do is use the function suppress. In more detail, you groupByKey() the KStream and then use a Window function. Lastly, aggregate the grouped data and use suppress.

s_joined.toStream()
        .groupByKey()
        .WindowedBy(...)
        .aggregate(...)
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));