0
votes

I'm using flink streaming and flink-connector-kafka to process data from kafka. when I configure FlinkKafkaConsumer010 with setStartFromTimestamp(1586852770000L) , at this time, all data's time in kafka topic A is before 1586852770000L, then I send some message to partition-0 and partition-4 of Topic A (Topic A has 6 partitions, current system time is already after 1586852770000L). but my flink program doesn't consume any data from Topic A. So is this a issue?

if I stop my flink program and restart it, it can consume data from partition-0 and partition-4 of Topic A , but still won't consume any data from other 4 partitions if i send data to the other 4 partitions unless i restart my flink program again.

the log of kafka is as follows:

2020-04-15 11:48:46,447 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-4=1586836800000}, minVersion=1) to broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,466 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 185, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=4,error_code=0,timestamp=1586852770000,offset=4}]}]} from broker server1:9092 (id: 185 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-4. Fetched offset 4, timestamp 1586852770000


2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-0=1586836800000}, minVersion=1) to broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,463 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=0,timestamp=1586836800000}]}]} to node 184.
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 184, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]}
2020-04-15 11:48:46,467 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=0,error_code=0,timestamp=1586863210000,offset=47}]}]} from broker server2:9092 (id: 184 rack: null)
2020-04-15 11:48:46,467 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-0. Fetched offset 47, timestamp 1586863210000


2020-04-15 11:48:46,448 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={TopicA-2=1586836800000}, minVersion=1) to broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,465 TRACE org.apache.kafka.clients.NetworkClient                        - Sending {replica_id=-1,topics=[{topic=TopicA,partitions=[{partition=2,timestamp=1586836800000}]}]} to node 183.
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.NetworkClient                        - Completed receive from node 183, for key 2, received {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=-1}]}]}
2020-04-15 11:48:46,468 TRACE org.apache.kafka.clients.consumer.internals.Fetcher           - Received ListOffsetResponse {responses=[{topic=TopicA,partition_responses=[{partition=2,error_code=
0,timestamp=-1,offset=-1}]}]} from broker server3:9092 (id: 183 rack: null)
2020-04-15 11:48:46,468 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher           - Handling ListOffsetResponse response for TopicA-2. Fetched offset -1, timestamp -1

2020-04-15 11:48:46,481 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 2 partitions from timestamp 1586836800000: [KafkaTopicPartition{topic='TopicA', partition=4}, KafkaTopicPartition{topic='TopicA', partition=0}]

from the log, except partition-0 and partition-4, other 4 partition's offset is -1. why the return offset is -1 instead of the lastest offset?

in Kafka client's code( Fetcher.java,line: 674-680)

// Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
   OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
   timestampOffsetMap.put(topicPartition, offsetData);
}

the value of ListOffsetResponse.UNKNOWN_OFFSET is -1 . So the other 4 partitions is filtered , and the kafka consumer will not consume data from the other 4 partitions.

My Flink version is 1.9.2 and flink kafka connertor is

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
   <version>1.9.2</version>
</dependency>

the doc of flink kafka connector is as follows:

setStartFromTimestamp(long): Start from the specified timestamp. For each partition, the record whose timestamp is larger than or equal to the specified timestamp will be used as the start position. If a partition’s latest record is earlier than the timestamp, the partition will simply be read from the latest record.

test program code:

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.junit.Test

class TestFlinkKafka {

  @Test
  def testFlinkKafkaDemo: Unit ={
    //1. set up the streaming execution environment.
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic( TimeCharacteristic.ProcessingTime)
    // To use fault tolerant Kafka Consumers, checkpointing needs to be enabled at the execution environment
    env.enableCheckpointing(60000)
    //2. kafka source
    val topic = "message"
    val schema = new SimpleStringSchema()
    //server1:9092,server2:9092,server3:9092
    val props = getKafkaConsumerProperties("localhost:9092","flink-streaming-client", "latest")
    val  consumer = new FlinkKafkaConsumer010(topic, schema, props)
    //consume data from special timestamp's offset
    //2020/4/14 20:0:0
    //consumer.setStartFromTimestamp(1586865600000L)
    //2020/4/15 20:0:0
    consumer.setStartFromTimestamp(1586952000000L)
    consumer.setCommitOffsetsOnCheckpoints(true)

    //3. transform
    val stream = env.addSource(consumer)
      .map(x => x)

    //4. sink
    stream.print()

    //5. execute
    env.execute("testFlinkKafkaConsumer")

  }

  def getKafkaConsumerProperties(brokerList:String, groupId:String, offsetReset:String): Properties ={
    val props = new Properties()
    props.setProperty("bootstrap.servers", brokerList)
    props.setProperty("group.id", groupId)
    props.setProperty("auto.offset.reset", offsetReset)
    props.setProperty("flink.partition-discovery.interval-millis", "30000")
    props
  }

}

set log level for kafka:

log4j.logger.org.apache.kafka=TRACE

create kafka topic:

kafka-topics --zookeeper localhost:2181/kafka --create --topic message --partitions 6 --replication-factor 1

send message to kafka topic

kafka-console-producer --broker-list localhost:9092 --topic message

{"name":"tom"}
{"name":"michael"}
1
What version of Kafka is running on your Kafka servers?David Anderson
my kafka broker servers version is 1.0.1. I also try it on kafka broker 0.10.2, still has this problemmichael_xu
i have add some simple test code.michael_xu
Can you test with servers running 1.0.1, but using FlinkKafkaConsumer rather than FlinkKafkaConsumer010? You'll need to use flink-connector-kafka_2.11. If that still fails, then I think this might be a bug.David Anderson
According to the instructions at the beginning of the document, it should support kafka server 0.10.x and kafka connector 0.10.x. It does not say that setStartFromTimestamp (long) is only applicable to kafa 1.0.0 and above and kafka connector universal. When using Kafka Server 0.10.x And kafka connector 0.10.x will still have this problem. We use kafka connector 0.10.x because the version of kafka cluster in our customer's production environment is 0.10.x, persuading to upgrade the version of kafka cluster is not an easy task...michael_xu

1 Answers

0
votes

This problem was resolved by upgrading the Flink/Kafka connector to the newer, universal connector -- FlinkKafkaConsumer -- available from flink-connector-kafka_2.11. This version of the connector is recommended for all versions of Kafka from 1.0.0 forward. With Kafka 0.10.x or 0.11.x, it is better to use the version-specific flink-connector-kafka-0.10_2.11 or flink-connector-kafka-0.11_2.11 connectors. (And in all cases, substitute 2.12 for 2.11 if you are using Scala 2.12.)

See the Flink documentation for more information on Flink's Kafka connector.