3
votes

I am currently working on an Flink application that uses some of the Hadoop dependencies to write the data to S3 location. On local environment it is working fine, however when I deploy this Flink application on EMR cluster it throws an exception related to compatibility issue.

The error message that I am getting is

java.lang.RuntimeException: Could not load the TypeInformation for the class 'org.apache.hadoop.io.Writable'. You may be missing the 'flink-hadoop-compatibility' dependency. at org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778) ....

I have included the maven dependency of flink-hadoop-compatibility-2.10 jar in POM dependency. But it is not detecting it. The Flink version I am using is 1.2.0

However, when I explicitly copy the compatibility JAR to the ${FLINK-HOME}/lib location, I am not getting any exception and able to run the Flink application successfully.


Is there any way that we can use, so that without deploying the JAR file to ${FLINK-HOME}/lib we can run the application?

OR

What modifications required in POM dependencies, so that the application will detect it and it is not required to copy the compatibility JAR to flink-home/lib location?

2
Answer depends what initial exception actually saysOneCricketeer
@cricket_007 Here is the error message that I am getting java.lang.RuntimeException: Could not load the TypeInformation for the class 'org.apache.hadoop.io.Writable'. You may be missing the 'flink-hadoop-compatibility' dependency. at org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025)Avinash
Also the Flink version that I am using is 1.2.0Avinash
Have you tried making an uber/shaded jar for your Flink application that includes that library?OneCricketeer
@cricket_007 yes I have the maven shaded plugin configured in the POM, that includes the flink-hadoop-compatibility jar while creating the shaded jar. Also as per the link "apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/…" that suggest to add flink-shaded-hadoop2 dependency in POM, I found that this jar also getting included into the shaded jar.Avinash

2 Answers

2
votes
package org.apache.flink.api.java.typeutils;

public class TypeExtractor {

    /** The name of the class representing Hadoop's writable */
    private static final String HADOOP_WRITABLE_CLASS = "org.apache.hadoop.io.Writable";
    private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = "org.apache.flink.api.java.typeutils.WritableTypeInfo";


    // visible for testing
    public static <T> TypeInformation<T> createHadoopWritableTypeInfo(Class<T> clazz) {
        checkNotNull(clazz);

        Class<?> typeInfoClass;
        try {
            typeInfoClass = Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, TypeExtractor.class.getClassLoader());
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("Could not load the TypeInformation for the class '"
                    + HADOOP_WRITABLE_CLASS + "'. You may be missing the 'flink-hadoop-compatibility' dependency.");
        }

        ...
    }
}

This is because org.apache.hadoop.io.Writable is mean to be loaded by TypeExtractor.class.getClassLoader() which is AppClassLoader, and the submited flink jar is loaded by ParentFirstClassLoader, which is the child of AppClassLoader, so AppClassLoader can not load org.apache.hadoop.io.Writable from your flink jar.

I'm not sure if it's a bug, change to classLoader to Thread.currentThread().getContextClassLoader() will make it work without copy the flink-hadoop-compatibility jar file to ${FLINK-HOME}/lib location.

1
votes

After looking into various posts and experimenting with POM files, I think with current version of Apache Flink (1.2.0) it is required to copy (deploy) the JAR file to ${FLINK-HOME}/lib location.