0
votes

I'm creating a streaming job that reads two kafka topics and writes in one kafka topic. I'm working with these versions : flink 1.4.1, kafka_2.11-1.0.0 and flink-connector-kafka-0.11_2.11.

Sometimes (this is not systematic), I have this log :

KafkaThread.6648.1 - - |43| Uncaught exception in kafka-producer-network-thread | agr-client-id-INH_INH.FRQ-AGR-20180911XXXX-1536659128943: 
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1 
        at org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:583) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:705) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:443) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161] 
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.NetworkClient$1 
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_161] 
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_161] 
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) ~[flink-dist_2.11-1.4.1.jar:1.4.1]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_161] 
        ... 6 common frames omitted

Here agr-client-id-INH_INH.FRQ-AGR-20180911XXXX-1536659128943 is the group.id of my producer.

I have several jobs that works at the same time. This log can be appear for one job and not others. I have no problem with consumers. I feel that this log can appear even when the job does not write in the Kafka topic.

My pom.xml looks like this :

<properties>
    <flink.version>1.4.1</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-jdbc</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
                <execution>
                    <phase>install</phase>
                    <goals>
                        <goal>copy-dependencies</goal>
                    </goals>
                    <configuration>
                        <outputDirectory>target/lib</outputDirectory>
                        <overWriteReleases>false</overWriteReleases>
                        <overWriteSnapshots>true</overWriteSnapshots>
                        <excludeTransitive>true</excludeTransitive>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>${maven.shade.version}</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder. Otherwise, 
                                        this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                    <exclude>logback.xml</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.mypackage.MyClass</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

I would like to understand why this log appears, its severity and how can I fix it... Thanks.

1
How do you run your job? Do you build a fatjar and deploy it to a standalone cluster?TobiSH
I can only see a dependency to kafka 0.11 (transitive by flink-connector-kafka-0.11_2.11). How do you get kafka-1.0?TobiSH
I run my job like this : bin/flink run -d myApp.jar [my args] 2>&1 >> "my_app.log". I confirm, I build a fatjar and deploy it to a standalone cluster. About Kafka, the remote server version is 1.0 and the client version that I use in my streaming job is 0.11 (see pom.xml). I did not overload the kafka connector version in my pom.xmlJulien Preisner
@JulienPreisner You might check out this article how to debug ClassNotFound exception in Flink jobs: heapanalytics.com/blog/engineering/…Sayat Satybald
@SayatSatybald This article is very interesting. I conclude that it is necessary to use RichSinkFunction to close properly clients. However, FlinkKafkaProducer011 implements RichSinkFunction (ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/…). I don't know what to think...Julien Preisner

1 Answers

0
votes

This error most likely happened because of not being able to connect to Kafka broker and the reason might be:

  • Your Kafka bootstrap-server is not working.

  • Your Kafka bootstrap-server is on a different machine and your machine IP address is not whitelisted for Kafka server.

Try to run local Kafka and connect to it to see if the error persists. Try to run zookeeper on all the nodes that have Flink.