0
votes

Flink Version 1.9.0

Scala Version 2.11.12

Kafka Cluster Version 2.3.0

I am trying to connect a flink job I made to a kafka cluster that has 3 partitions. I have tested my job against a kafka cluster topic running on my localhost that has one partition and it works to read and write to the local kafka. When I attempt to connect to a topic that has multiple partitions I get the following error (topicName is the name of the topic I am trying to consume. Weirdly I dont have any issues when I am trying to produce to a multi-partition topic.

java.lang.RuntimeException: topicName
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:80)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

My consumer code looks like this:

  def defineKafkaDataStream[A: TypeInformation](topic: String,
                                                env: StreamExecutionEnvironment,
                                                SASL_username:String,
                                                SASL_password:String,
                                                kafkaBootstrapServer: String = "localhost:9092",
                                                zookeeperHost: String = "localhost:2181",
                                                groupId: String = "test"
                                               )(implicit c: JsonConverter[A]): DataStream[A] = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaBootstrapServer)
    properties.setProperty("security.protocol" , "SASL_SSL")
    properties.setProperty("sasl.mechanism" , "PLAIN")
    val jaasTemplate = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";"
    val jaasConfig = String.format(jaasTemplate, SASL_username, SASL_password)
    properties.setProperty("sasl.jaas.config", jaasConfig)
    properties.setProperty("group.id", "MyConsumerGroup")

    env
      .addSource(new FlinkKafkaConsumer(topic, new JSONKeyValueDeserializationSchema(true), properties))
      .map(x => x.convertTo[A](c))
  }

Is there another property I should be setting to allow for a single job to consume from multiple partitions?

1
I am just guessing - but could it be that this has nothing to do with multiple partitions but with the authentification at the cluster? The discovery of the partitions might be the first request which fails because of that ... - TobiSH
From the documentation it seems that the org.apache.kafka.common.errors.AuthenticationException should be thrown when the authentication fails. - Dominik WosiƄski
@TobiSH I had authentication errors earlier and those had a much different error message. Also I am able to sink messages from Flink to that same stream so I think my authentication code works fine. - huntingData

1 Answers

0
votes

After digging around and questioning everything in my process I found the issue.

I looked at the Java code of the KafkaPartitionDiscoverer function that had the runtime exception.

One section I noticed handled RuntimeException

if (kafkaPartitions == null) {
    throw new RuntimeException("Could not fetch partitions for %s. Make sure that the topic exists.".format(topic));
    }

I was working off of a kafka cluster that I dont maintain and had a topic name that was given to me that I did not verify first. When I did verify it using:

kafka-topics --describe --zookeeper serverIP:2181 --topic topicName

It returned a response of :

Error while executing topic command : Topics in [] does not exist
ERROR java.lang.IllegalArgumentException: Topics in [] does not exist
    at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:435)
    at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:350)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)

After I got the correct topic name everything works.