I want to join Customer and Address objects based on the id. These are my inputs to kafka stream for customer topic
{"id": 1,"name": "Yogesh"}
{"id": 2,"name": "Swati" }
{"id": 3,"name": "Shruti"}
{"id": 4,"name": "Amol" }
{"id": 5,"name": "Pooja" }
{"id": 6,"name": "Kiran" }
and following fro address
{"id": 1,"address":"Pune" }
{"id": 2,"address":"Pune" }
{"id": 3,"address":"Pune" }
{"id": 4,"address":"Kalyan"}
{"id": 5,"address": "Pimpri"}
I have use interval join as well JoinFunction using TumblingEventTimeWindows and sliding window but its not joining customer and address streams. I don't understand what i have missed in the code.
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = setupEnvironment();
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
init(params);
FlinkKafkaConsumerBase<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(customerTopic,
new SimpleStringSchema(), CommonConfig.getConsumerProperties(ip, port, customerTopic))
.setStartFromEarliest();
DataStream<Customer> customerStream = env.addSource(flinkKafkaConsumer)
.flatMap(new FlatMapFunction<String, Customer>() {
private static final long serialVersionUID = 2142214034515856836L;
@Override
public void flatMap(String value, Collector<Customer> out) throws Exception {
Customer customer = null;
try {
customer = mapper.readValue(value, Customer.class);
} catch (Exception exception) {
System.out.println(exception);
}
if (null != customer) {
out.collect(customer);
}
}
});
customerStream.print();
DataStream<Address> addressStream = env
.addSource(new FlinkKafkaConsumer<>(addressTopic, new SimpleStringSchema(),
CommonConfig.getConsumerProperties(ip, port, addressTopic)).setStartFromEarliest())
.flatMap(new FlatMapFunction<String, Address>() {
private static final long serialVersionUID = 2142214034515856836L;
@Override
public void flatMap(String value, Collector<Address> out) throws Exception {
Address address = null;
try {
address = mapper.readValue(value, Address.class);
} catch (Exception exception) {
System.out.println(exception);
}
if (null != address) {
out.collect(address);
}
}
});
addressStream.print();
customerStream.keyBy(new IdSelectorCustomer()).intervalJoin(addressStream.keyBy(new IdSelectorAddress()))
.between(Time.seconds(-2), Time.seconds(1))
.process(new ProcessJoinFunction<Customer, Address, CustomerInfo>() {
private static final long serialVersionUID = -3658796606815087434L;
@Override
public void processElement(Customer customer, Address address,
ProcessJoinFunction<Customer, Address, CustomerInfo>.Context ctx,
Collector<CustomerInfo> collector) throws Exception {
collector.collect(new CustomerInfo(customer.getId(), customer.getName(), address.getAddress()));
}
}).print();
DataStream<CustomerInfo> joinResultStream = customerStream.join(addressStream).where(new IdSelectorCustomer())
.equalTo(new IdSelectorAddress()).window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Customer, Address, CustomerInfo>() {
private static final long serialVersionUID = -8913244745978230585L;
@Override
public CustomerInfo join(Customer first, Address second) throws Exception {
return new CustomerInfo(first.getId(), first.getName(), second.getAddress());
}
});
joinResultStream.print();
env.execute("Execute");
}
// ===============================================================================
public class IdSelectorAddress implements KeySelector<Address,Long> {
private static final long serialVersionUID = 7642739595630647992L;
@Override
public Long getKey(Address value) throws Exception {
return value.getId();
}
}
// ========================================================================
public class IdSelectorCustomer implements KeySelector<Customer,Long> {
private static final long serialVersionUID = 7642739595630647992L;
@Override
public Long getKey(Customer value) throws Exception {
return value.getId();
}
}