14
votes

I'm trying to connect Flink to a Kafka consumer

I'm using Docker Compose to build 4 containers zookeeper, kafka, Flink JobManager and Flink TaskManager.

For zookeeper and Kafka I'm using wurstmeister images, and for Flink I'm using the official image.

docker-compose.yml

version: '3.1'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    hostname: zookeeper
    expose:
      - "2181"
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    hostname: kafka
    links:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_CREATE_TOPICS: 'pipeline:1:1:compact'

  jobmanager:
    build: ./flink_pipeline
    depends_on:
      - kafka
    links:
      - zookeeper
      - kafka
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager
      BOOTSTRAP_SERVER: kafka:9092
      ZOOKEEPER: zookeeper:2181

  taskmanager:
    image: flink
    expose:
      - "6121"
      - "6122"
    links:
      - jobmanager
      - zookeeper
      - kafka
    depends_on:
      - jobmanager
    command: taskmanager
    # links:
    #   - "jobmanager:jobmanager"
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager

And When I submit a simple job to Dispatcher the Job fails with the following error:

org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition pipeline-0 could be determined

My Job code is:

public class Main {
    public static void main( String[] args ) throws Exception
    {
        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        Properties properties = new Properties();
        String bootstrapServer = System.getenv("BOOTSTRAP_SERVER");
        String zookeeperServer = System.getenv("ZOOKEEPER");

        if (bootstrapServer == null) {
            System.exit(1);
        }

        properties.setProperty("zookeeper", zookeeperServer);
        properties.setProperty("bootstrap.servers", bootstrapServer);
        properties.setProperty("group.id", "pipeline-analysis");

        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<String>("pipeline", new SimpleStringSchema(), properties);
        // kafkaConsumer.setStartFromGroupOffsets();
        kafkaConsumer.setStartFromLatest();

        DataStream<String> stream = env.addSource(kafkaConsumer);

        // Defining Pipeline here

        // Printing Outputs
        stream.print();

        env.execute("Stream Pipeline");
    }
}
2
Did you resolve your issue? I'm having the same problem, but I'm not using docker, just plain flink. It's happening consistently on some topics while some other are fine. Resetting offset doesn't seem to help.0x26res

2 Answers

2
votes

I know I'm late to the party but I had the exact same error. In my case, I was not setting up TopicPartitions correctly. My topic had 2 partitions and my producer was producing messages just fine, but it's the spark streaming application, as my consumer, that wasn't really starting and giving up after 60 secs complaining the same error.

Wrong code that I had -

List<TopicPartition> topicPartitionList = Arrays.asList(new topicPartition(topicName, Integer.parseInt(numPartition)));

Correct code -

List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();

for (int i = 0; i < Integer.parseInt(numPartitions); i++) {
    topicPartitionList.add(new TopicPartition(topicName, i));
}
0
votes

I had an error that looks the same.

17:34:37.668 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-3, groupId=api.dev] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition aaa-1 could be determined

Turns out it's my hosts file has been changed so the broker address is wrong.

Try this log settings to debug more details.

<logger name="org.apache.kafka.clients.consumer.internals.Fetcher" level="info" />