2
votes

I am facing an issue in joining a KStream with a GlobalKTable and would appreciate your help.

Given the two Kafka topics orders and customers:

Orders

"1"     {"ID":"1","Name":"Myorder1","CustID":"100"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200"}

Customers

"100"   {"CustID":"100","CustName":"Customer1"}

"200"   {"CustID":"200","CustName":"Customer2"}

The requirement is to enrich the stream of orders with the customer name

"1"     {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}

I am trying the following:

  1. Build a KStream from the orders topic
  2. Build a GlobalKTable from the customers topic
  3. Build another stream which joins Orders and Customers (look up Order.CustID in the Customer table)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
    customers,
    new KeyValueMapper<String, Order, String>() {            
        @Override
        public String apply(String key, Order value) {
           return value.CustID;
        }
    },
    new ValueJoiner<Order,Customer, EnrichedOrder>() {
        @Override
        public EnrichedOrder apply(Order order, Customer customer) {
            EnrichedOrder eorder = new EnrichedOrder();
            eorder.CustID = order.CustID;
            eorder.CustName = customer.CustName;
            eorder.ID = order.ID;
            eorder.Name = order.Name;           
            return eorder;
        }
    }
);

But it’s not giving any result and does not throw any exception either.

When using a leftJoin, I am getting a NullPointer exception for Customer.

Please let me know in case you have faced a similar issue and suggest how to fix this.

2
Make sure that the GlobalKTable is entirely populated when the stream processing starts. It may be the case that orders are already processing while the customer table is still being populated. To avoid this, start the streams application and only then produce new order events. Also you may need to reset offsets when running your tests multiple times.user152468
is there any way to check if GlobalKtable is populated entirely? Like foreach loop in kstream.deepak
@deepak, could you provide the code by which you create the GlobalKTable?dmkvl
@deepak, make sure your messages (orders and customers) have keys. By the way, do you really need a GlobalKTable (and not KTable)?dmkvl
@user152468 -- your initial concern about "loading" the table should not apply -- on startup, a GlobalKTable is bootstrapped to the end of the topic before any processing begins (this is a difference to KTables that provide time-synchronized joining/processing while GlobalKTables are not time-synchronized).Matthias J. Sax

2 Answers

3
votes

Let's look carefully at the content of your copy-paste:

In the customers topic:

"100"   {"CustID":"100","CustName":"Customer1"}

You can notice the key is a String, and this String contains double-quotes: "100". Usually, the string keys are printed without the double-quotes. I would rather have expected to see:

 100    {"CustID":"100","CustName":"Customer1"}

In other words, the Java String representation of your key is ""100"" (or "\"100\"") and not "100" as we would expect.

On the other hand, the value in your orders topic is a Json {"ID":"1","Name":"Myorder1","CustID":"100"}, and the attribute CustID is a String, this time represented in Java "100".

When you join orders and customers, you try to match the orders CustID 100 with the Customer key "100". And this will fail because of the double-quotes in the key which are missing from the CustID.

0
votes

@deepak you may need to materialize your KTable

builder.table(customers, Materialized.as(customerStore));

Then stream the orders and build your join.