I am facing an issue in joining a KStream with a GlobalKTable and would appreciate your help.
Given the two Kafka topics orders
and customers
:
Orders
"1" {"ID":"1","Name":"Myorder1","CustID":"100"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200"}
Customers
"100" {"CustID":"100","CustName":"Customer1"}
"200" {"CustID":"200","CustName":"Customer2"}
The requirement is to enrich the stream of orders with the customer name
"1" {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}
I am trying the following:
- Build a KStream from the
orders
topic - Build a GlobalKTable from the
customers
topic - Build another stream which joins Orders and Customers (look up Order.CustID in the Customer table)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
customers,
new KeyValueMapper<String, Order, String>() {
@Override
public String apply(String key, Order value) {
return value.CustID;
}
},
new ValueJoiner<Order,Customer, EnrichedOrder>() {
@Override
public EnrichedOrder apply(Order order, Customer customer) {
EnrichedOrder eorder = new EnrichedOrder();
eorder.CustID = order.CustID;
eorder.CustName = customer.CustName;
eorder.ID = order.ID;
eorder.Name = order.Name;
return eorder;
}
}
);
But it’s not giving any result and does not throw any exception either.
When using a leftJoin
, I am getting a NullPointer exception for Customer.
Please let me know in case you have faced a similar issue and suggest how to fix this.
GlobalKTable
is entirely populated when the stream processing starts. It may be the case that orders are already processing while the customer table is still being populated. To avoid this, start the streams application and only then produce new order events. Also you may need to reset offsets when running your tests multiple times. – user152468GlobalKTable
is bootstrapped to the end of the topic before any processing begins (this is a difference toKTable
s that provide time-synchronized joining/processing whileGlobalKTable
s are not time-synchronized). – Matthias J. Sax