0
votes

I want to join Customer and Address objects based on the id. These are my inputs to kafka stream for customer topic

{"id": 1,"name": "Yogesh"}
{"id": 2,"name": "Swati" }
{"id": 3,"name": "Shruti"}
{"id": 4,"name": "Amol"  }
{"id": 5,"name": "Pooja" }
{"id": 6,"name": "Kiran" }

and following fro address

{"id": 1,"address":"Pune" }
{"id": 2,"address":"Pune" }
{"id": 3,"address":"Pune" }
{"id": 4,"address":"Kalyan"}
{"id": 5,"address": "Pimpri"}

I have use interval join as well JoinFunction using TumblingEventTimeWindows and sliding window but its not joining customer and address streams. I don't understand what i have missed in the code.

public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = setupEnvironment();
            final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
            env.getConfig().setGlobalJobParameters(params);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            init(params);

            FlinkKafkaConsumerBase<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(customerTopic,
                    new SimpleStringSchema(), CommonConfig.getConsumerProperties(ip, port, customerTopic))
                            .setStartFromEarliest();

            DataStream<Customer> customerStream = env.addSource(flinkKafkaConsumer)
                    .flatMap(new FlatMapFunction<String, Customer>() {
                        private static final long serialVersionUID = 2142214034515856836L;

                        @Override
                        public void flatMap(String value, Collector<Customer> out) throws Exception {
                            Customer customer = null;
                            try {
                                customer = mapper.readValue(value, Customer.class);
                            } catch (Exception exception) {
                                System.out.println(exception);
                            }
                            if (null != customer) {
                                out.collect(customer);
                            }
                        }
                    });

            customerStream.print();

            DataStream<Address> addressStream = env
                    .addSource(new FlinkKafkaConsumer<>(addressTopic, new SimpleStringSchema(),
                            CommonConfig.getConsumerProperties(ip, port, addressTopic)).setStartFromEarliest())
                    .flatMap(new FlatMapFunction<String, Address>() {
                        private static final long serialVersionUID = 2142214034515856836L;

                        @Override
                        public void flatMap(String value, Collector<Address> out) throws Exception {
                            Address address = null;
                            try {
                                address = mapper.readValue(value, Address.class);
                            } catch (Exception exception) {
                                System.out.println(exception);
                            }
                            if (null != address) {
                                out.collect(address);
                            }
                        }
                    });

            addressStream.print();

            customerStream.keyBy(new IdSelectorCustomer()).intervalJoin(addressStream.keyBy(new IdSelectorAddress()))
                    .between(Time.seconds(-2), Time.seconds(1))
                    .process(new ProcessJoinFunction<Customer, Address, CustomerInfo>() {
                        private static final long serialVersionUID = -3658796606815087434L;
                        @Override
                        public void processElement(Customer customer, Address address,
                                ProcessJoinFunction<Customer, Address, CustomerInfo>.Context ctx,
                                Collector<CustomerInfo> collector) throws Exception {
                            collector.collect(new CustomerInfo(customer.getId(), customer.getName(), address.getAddress()));
                        }
                    }).print();

            DataStream<CustomerInfo> joinResultStream = customerStream.join(addressStream).where(new IdSelectorCustomer())
                    .equalTo(new IdSelectorAddress()).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new JoinFunction<Customer, Address, CustomerInfo>() {
                        private static final long serialVersionUID = -8913244745978230585L;
                        @Override
                        public CustomerInfo join(Customer first, Address second) throws Exception {
                            return new CustomerInfo(first.getId(), first.getName(), second.getAddress());
                        }
                    });
            joinResultStream.print();
            env.execute("Execute");
}

    // ===============================================================================
    public class IdSelectorAddress implements KeySelector<Address,Long> {
        private static final long serialVersionUID = 7642739595630647992L;
        @Override
        public Long getKey(Address value) throws Exception {
            return value.getId();
        }
    }

    // ========================================================================
    public class IdSelectorCustomer implements KeySelector<Customer,Long> {
        private static final long serialVersionUID = 7642739595630647992L;
        @Override
        public Long getKey(Customer value) throws Exception {
            return value.getId();
        }
    }
2

2 Answers

1
votes

I assume you do not see any results. The reason is that you windows are never fired/evaluated/closed.

Your events do not have timestamps. Still you set TimeCharacteristic.EventTime. Since you do not assign timestamps and watermarks, there is no way for Flink to tell if how to join the events in a windowed or interval join.

Use DataStream#assignTimestampsAndWatermarks or FlinkKafkaConsumer#assignTimestampsAndWatermarks to work with event time or change the time characteristic to ProcessingTime.

I hope this will guide you in the right direction.

1
votes

Since you are working with event time, you must use assignTimeStampsAndWatermarks on both streams so that Flink can assign events to windows, and know when the windows are complete and can be triggered.

To read more about this topic, you might start with this tutorial about streaming analytics or here in the docs about how to implement a timestamp extractor and watermark assigner.