5
votes

I want to read a kafka topic from flink

package Toletum.pruebas;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class LeeKafka {
  public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);
    
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      FlinkKafkaConsumer082<String> kafkaSrc = new FlinkKafkaConsumer082<String>("test02", 
      new SimpleStringSchema(), 
      parameterTool.getProperties());
      
      DataStream<String> messageStream = env.addSource(kafkaSrc);
      
    messageStream.rebalance().map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;
  
      public String map(String value) throws Exception {
        return "Kafka and Flink says: " + value;
      }
    }).print();

    env.execute("LeeKafka");
  }

}

this code works successfully:

java -cp Package.jar Toletum.pruebas.LeeKafka --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

But, when I try use from flink:

flink run -c Toletum.pruebas.LeeKafka pruebas-0.0.1-SNAPSHOT-jar-with-dependencies.jar --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

I get an error:

java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getHostnamePort(Ljava/lang/String;)Ljava/net/URL;
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:592)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafkaConsumer082.java:49)
        at Toletum.pruebas.LeeKafka.main(LeeKafka.java:22)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
2
Can it be that the version with which you've compiled your job and the Flink version running on the cluster are not equal?Till Rohrmann
Thanks.... I was using old version in pom.xmlCarlos

2 Answers

2
votes

Old version lib.....

Correct pom.xml:



            &ltdependency&gt
                    &ltgroupId&gtorg.apache.flink&lt/groupId&gt
                    &ltartifactId&gtflink-connector-kafka&lt/artifactId&gt
                    &ltversion&gt0.10.1&lt/version&gt
            &lt/dependency&gt

1
votes

This problem is due to using the old version of FLink Connector library.

You can check the latest available library and download the latest Maven Dependency.

The Kafka version you are using should also be considered.

Try using latest Maven dependency from Flink Documentation for Kafka Connector

The latest maven dependency is

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>1.3.2</version>
</dependency>