1
votes

I try to get a Beam pipeline to run on Azure's HDInsight SparkRunner. I tried first with a cluster based on Spark 2.3.0/Hadoop 2.7 (HDI 3.6) and then also 2.3.1/Hadoop 3.0 (HDI 4.0 Preview). I tried using Apache Beam 2.2.0 and next 2.10.0-SNAPSHOT.

The spark-submit command is (for Beam 2.10.0):

JARS="wasbs:///dependency/hadoop-azure-3.1.1.3.0.2.0-50.jar,wasbs:///dependency/azure-storage-7.0.0.jar,wasbs:///dependency/beam-model-fn-execution-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-model-job-management-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-model-pipeline-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-runners-core-construction-java-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-runners-core-java-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-runners-direct-java-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-runners-spark-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-sdks-java-core-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-sdks-java-fn-execution-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-sdks-java-io-hadoop-file-system-2.10.0-SNAPSHOT.jar,wasbs:///dependency/beam-vendor-grpc-1_13_1-0.1.jar"

spark-submit --conf spark.yarn.maxAppAttempts=1 --deploy-mode cluster --master yarn --jars $JARS --class example.MinimalWordCountJava8 wasbs:///mavenproject1-1.0-SNAPSHOT.jar --runner=SparkRunner

(initially -jars was not given the hadoop-azure and azure-storage jars, but that did not make any difference).

The main() looks like this:

public static void main(String[] args) {

    JavaSparkContext ct = new JavaSparkContext();
    Configuration config = ct.hadoopConfiguration();

    config.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
    config.set("fs.wasb.impl",  "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
    config.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb");
    config.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasbs");
    config.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
    config.set("fs.azure.account.key." + account + ".blob.core.windows.net", key);
    config.set("fs.defaultFS", "wasb://" + container + "@" + account + ".blob.core.windows.net");

    System.out.println("### hello.txt content:");
    JavaRDD<String> content = ct.textFile("wasbs:///hello.txt");
    System.out.println(content.toString());

    System.out.println("### MinimalWordCountJava8");

    PipelineOptions options = PipelineOptionsFactory.create();
    SparkContextOptions sparkContextOptions = options.as(SparkContextOptions.class);
    sparkContextOptions.setUsesProvidedSparkContext(true);
    sparkContextOptions.setProvidedSparkContext(ct);
    sparkContextOptions.setRunner(SparkRunner.class);

    Pipeline p = Pipeline.create(sparkContextOptions);

    p.apply(TextIO.read().from("hello.txt"))
     .apply(FlatMapElements
         .into(TypeDescriptors.strings())
         .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
     .apply(Filter.by((String word) -> !word.isEmpty()))
     .apply(Count.<String>perElement())
     .apply(MapElements
         .into(TypeDescriptors.strings())
         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
     // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
     .apply(TextIO.write().to("output"));

    p.run().waitUntilFinish();

It fails when calling Pipeline.create(options); with this exception trace:

18/12/09 14:47:10 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Failed to construct Hadoop filesystem with configuration Configuration: /usr/hdp/3.0.2.0-50/hadoop/conf/core-site.xml, /usr/hdp/3.0.2.0-50/hadoop/conf/hdfs-site.xml
java.lang.IllegalArgumentException: Failed to construct Hadoop filesystem with configuration Configuration: /usr/hdp/3.0.2.0-50/hadoop/conf/core-site.xml, /usr/hdp/3.0.2.0-50/hadoop/conf/hdfs-site.xml
    at org.apache.beam.sdk.io.hdfs.HadoopFileSystemRegistrar.fromOptions(HadoopFileSystemRegistrar.java:59)
    at org.apache.beam.sdk.io.FileSystems.verifySchemesAreUnique(FileSystems.java:489)
    at org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions(FileSystems.java:479)
    at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:47)
    at org.apache.beam.sdk.Pipeline.create(Pipeline.java:145)
    at io.aptly.mavenproject1.MinimalWordCountJava8.main(MinimalWordCountJava8.java:88)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "wasbs"
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
    at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:3377)
    at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:530)
    at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:542)
    at org.apache.beam.sdk.io.hdfs.HadoopFileSystem.<init>(HadoopFileSystem.java:82)
    at org.apache.beam.sdk.io.hdfs.HadoopFileSystemRegistrar.fromOptions(HadoopFileSystemRegistrar.java:56)
    ... 10 more
18/12/09 14:47:10 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.IllegalArgumentException: Failed to construct Hadoop filesystem with configuration Configuration: /usr/hdp/3.0.2.0-50/hadoop/conf/core-site.xml, /usr/hdp/3.0.2.0-50/hadoop/conf/hdfs-site.xml
    at org.apache.beam.sdk.io.hdfs.HadoopFileSystemRegistrar.fromOptions(HadoopFileSystemRegistrar.java:59)
    at org.apache.beam.sdk.io.FileSystems.verifySchemesAreUnique(FileSystems.java:489)
    at org.apache.beam.sdk.io.FileSystems.setDefaultPipelineOptions(FileSystems.java:479)
    at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:47)
    at org.apache.beam.sdk.Pipeline.create(Pipeline.java:145)
    at io.aptly.mavenproject1.MinimalWordCountJava8.main(MinimalWordCountJava8.java:88)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "wasbs"
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3332)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
    at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:3377)
    at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:530)
    at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:542)
    at org.apache.beam.sdk.io.hdfs.HadoopFileSystem.<init>(HadoopFileSystem.java:82)
    at org.apache.beam.sdk.io.hdfs.HadoopFileSystemRegistrar.fromOptions(HadoopFileSystemRegistrar.java:56)

The submit works (the wasps:// is recognised) and reading the small wasps:///hello.txt does not fail. These cases indicate that using wasps:// is fine until that point.

It's early inside Beam, that it seems to fail.

Because of this I passed the JavaSparkContext with the PipelineOptions (with dynamic hadoop configurations that were suggested by other SO question/answers). But this did not make a difference for me.

Anyone who can guide on how to get around this issue?

1
Not the same situation, but seems this extended URL scheme trips up folks in other contexts: stackoverflow.com/questions/38254771/…Kenn Knowles

1 Answers

1
votes

From quickly digging through code and bug trackers, it looks like Azure is supported as a Hadoop filesystem starting with Hadoop 3.2.0 (code, Jira). Currently Beam is pinned to version 2.7.3. This would explain the failure in Beam's HadoopFilesystem.

It may be that spark-submit succeeded because wasbs:// is supported via a different mechanism than Hadoop's libraries or using a bundled and newer version of Hadoop.