0
votes

I'm trying to run a simple Apache Flink script with Kafka inegration but I keep on having problems with the execution. The script should read messages coming from a kafka producer, elaborate them, and then send back again, to an other topic, the result of the processing. I've get this example from here: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-td4828.html

The error I have is:

Exception in thread "main" java.lang.NoSuchFieldError:ALL 
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenera‌tor.createJobGraph(S‌​treamingJobGraphGene‌​rator.java:86) 
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph‌​(StreamGraph.java:42‌​9) 
at org.apache.flink.streaming.api.environment.LocalStreamEnviro‌nment.execute(LocalS‌​treamEnvironment.jav‌​a:46) 

at org.apache.flink.streaming.api.environment.LocalStreamEnviro‌nment.execute(LocalS‌​treamEnvironment.jav‌​a:33)

This is my code:

public class App {
      public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
            Properties properties = new Properties(); 
            properties.setProperty("bootstrap.servers", "localhost:9092"); 

            //properties.setProperty("zookeeper.connect", "localhost:2181"); 
            properties.setProperty("group.id", "javaflink"); 

            DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), properties));
            System.out.println("Step D"); 
            messageStream.map(new MapFunction<String, String>(){ 

                    public String map(String value) throws Exception { 
                            // TODO Auto-generated method stub 
                            return "Blablabla " +  value; 
                    } 
            }).addSink(new FlinkKafkaProducer010("localhost:9092", "demo2", new SimpleStringSchema())); 
            env.execute(); 
      }
}

These are the pom.xml dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java_2.11</artifactId>
    <version>0.10.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-core</artifactId>
    <version>0.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.3.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.3.1</version>
</dependency>

What could cause this kind of error?

Thanks Luca

1
It is not an error. It is just a warning. Your job should work anyway. - Dawid Wysakowicz

1 Answers

0
votes

The problem is most likely caused by the mixture of different Flink versions you have defined in your pom.xml. In order to run this program, it should be enough to include the following dependencies:

<!-- Streaming API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.3.1</version>
</dependency>

<!-- In order to execute the program from within your IDE -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.3.1</version>
</dependency>

<!-- Kafka connector dependency -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.3.1</version>
</dependency>