0
votes

I'm struggling with integration hdfs to flink.

Scala binary version: 2.12,

Flink (cluster) version: 1.10.1

here is HADOOP_CONF_DIR;

and configuration of hdfs is here;

This configuration and HADOOP_CONF_DIR also the same in the taskmanager as well.

pom.xml;

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.8.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.10.0</version>
    </dependency>
</dependencies>

all I'm trying to get parquet files from hdfs, my sample code is there;

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

    Types.MessageTypeBuilder builder = Types.buildMessage();

    MessageType messageType = builder
            .required(INT64).named("column1")
            .required(BINARY).as(UTF8).named("column2")
            .required(INT64).named("column3")
            .required(INT64).named("column4")
            .required(BINARY).as(UTF8).named("column5")
            .required(BINARY).named("column6")
            .named("AppendTest");

    ParquetTableSource parquetTableSource = ParquetTableSource.builder()
        .path("hdfs://hdfs:8020/historic_data/data.parquet")
        .withConfiguration(hadoopConf)
        .forParquetSchema(messageType)
        .build();

    tEnv.registerTableSource("datatable", parquetTableSource);

    Table table = tEnv.sqlQuery("select * from datatable");

    DataSet<Row> tempDataSet = tEnv.toDataSet(table, Row.class);

    tempDataSet.print();

    env.execute("Job name - short desc.");

error is here;

Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:271)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:807)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:256)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)
... 22 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
... 27 more

that's odd part as you see Hadoop uber jar under the lib folder

and this is how i submit to job;

docker exec -it jobmanager env HADOOP_CONF_DIR=/tmp/hadoopconf flink run -C file:///opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar -d /tmp/core-batch-1.0.0.jar

and also I tried push job with flink UI but result is the same.

any help would be appreciated.

Thanks.

1

1 Answers

0
votes

@Aykut, I am not sure how helpful this may be, but I have recently been testing Flink 1.12 delivered to HDP3 via a custom ambari service I created from the Ambari Flink service on GitHub. When I install Flink into a base ambari cluster that has yarn/hdfs installed, everything works perfectly with all the dependencies. Once the yarn flink app is running in the yarn cluster, the rest of my application testing worked great. Reference Flink on Yarn.

Here are my Flink users environment vars:

export HADOOP_CONF_DIR=/etc/hadoop/conf; export HADOOP_CLASSPATH=/usr/hdp/3.1.4.0-315/hadoop/conf:/usr/hdp/3.1.4.0-315/hadoop/lib/*:/usr/hdp/3.1.4.0-315/hadoop/.//*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/./:/usr/hdp/3.1.4.0-315/hadoop-hdfs/lib/*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/.//*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/lib/*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/.//*:/usr/hdp/3.1.4.0-315/hadoop-yarn/./:/usr/hdp/3.1.4.0-315/hadoop-yarn/lib/*:/usr/hdp/3.1.4.0-315/hadoop-yarn/.//*:/usr/hdp/3.1.4.0-315/tez/*:/usr/hdp/3.1.4.0-315/tez/lib/*:/usr/hdp/3.1.4.0-315/tez/conf:/usr/hdp/3.1.4.0-315/tez/conf_llap:/usr/hdp/3.1.4.0-315/tez/doc:/usr/hdp/3.1.4.0-315/tez/hadoop-shim-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/hadoop-shim-2.8-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib:/usr/hdp/3.1.4.0-315/tez/man:/usr/hdp/3.1.4.0-315/tez/tez-api-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-common-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-dag-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-examples-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-history-parser-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-javadoc-tools-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-job-analyzer-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-mapreduce-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-protobuf-history-plugin-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-runtime-internals-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-runtime-library-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-tests-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-cache-plugin-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-with-acls-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-with-fs-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/ui:/usr/hdp/3.1.4.0-315/tez/lib/async-http-client-1.9.40.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-cli-1.2.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-codec-1.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-collections-3.2.2.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-collections4-4.1.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-io-2.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-lang-2.6.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-math3-3.1.1.jar:/usr/hdp/3.1.4.0-315/tez/lib/gcs-connector-1.9.10.3.1.4.0-315-shaded.jar:/usr/hdp/3.1.4.0-315/tez/lib/guava-28.0-jre.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-aws-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-azure-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-azure-datalake-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-hdfs-client-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-mapreduce-client-common-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-mapreduce-client-core-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-yarn-server-timeline-pluginstorage-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/jersey-client-1.19.jar:/usr/hdp/3.1.4.0-315/tez/lib/jersey-json-1.19.jar:/usr/hdp/3.1.4.0-315/tez/lib/jettison-1.3.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/jetty-server-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/tez/lib/jetty-util-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/tez/lib/jsr305-3.0.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/metrics-core-3.1.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/protobuf-java-2.5.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/RoaringBitmap-0.4.9.jar:/usr/hdp/3.1.4.0-315/tez/lib/servlet-api-2.5.jar:/usr/hdp/3.1.4.0-315/tez/lib/slf4j-api-1.7.10.jar:/usr/hdp/3.1.4.0-315/tez/lib/tez.tar.gz;

Here is my execution command:

/opt/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/bin/flink run javaProgram.jar

Here is sample java code (ingest s3 file to hdfs via Flink):

package org.myorg.quickstart;

import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class S3FlinkIngest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = streamExecutionEnvironment.readTextFile("s3a://s3Bucket/input.xml");
        dataStream.writeAsText("hdfs://hadoopHost:8020/user/flink/ingest.xml", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);;
        streamExecutionEnvironment.execute("S3 Flink Ingest");
    } }