0
votes

I am using Apache Flink, and trying to connect to Azure eventhub by using Apache Kafka protocol to receive messages from it. I manage to connect to Azure eventhub and receive messages, but I can't use flink feature "setStartFromTimestamp(...)" as described here (https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration). When I am trying to get some messages from timestamp, Kafka said that the message format on the broker side is before 0.10.0. Is anybody faced with this? Apache Kafka client version is 2.0.1 Apache Flink version is 1.7.2

UPDATED: tried to use Azure-Event-Hub quickstart examples (https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java) in consumer package added code to get offset with timestamp, it returns null as expected if message version under 0.10.0 kafka version.

        List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
        List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
        Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
        System.out.println(offsetAndTimestamp);
1
Looks like you answered your own post?OneCricketeer
No, because Azure Event Hub should have version of kafka api 1.0, acording to their documetation, which is bigger then 0.10.0. I just confirmed that it doesn't work also with their simple exampleVas9IH
Seems it isn't, or there is a bugVas9IH

1 Answers

0
votes

Sorry we missed this. Kafka offsetsForTimes() is now supported in EH (previously unsupported).

Feel free to open an issue against our Github in the future. https://github.com/Azure/azure-event-hubs-for-kafka