0
votes

I'm trying to load geojson file with spark and magellan library My code for loading is:

val polygons = spark.read.format("magellan").option("type", "geojson").load(inJson)

Where inJson is path to my json on s3: s3n://bucket-name/geojsons/file.json

Error with stack trace:

0.3 in stage 0.0 (TID 3, ip-172-31-19-102.eu-west-1.compute.internal, executor 1): java.lang.IllegalArgumentException: Wrong FS: s3n://bucket-name/geojsons/file.json, expected: hdfs://ip-172-31-27-182.eu-west-1.compute.internal:8020 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:653) at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304) at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:773) at magellan.mapreduce.WholeFileReader.nextKeyValue(WholeFileReader.scala:45) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:199) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:212) at scala.collection.AbstractIterator.fold(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086) at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

The problem occurs only when I run it on more than one machine, so it works fine on EMR cluster with master and 1 instance in core group, but fails like that with 10 instances in core group

2

2 Answers

2
votes

This turned out to be problem within Magellan WholeFileReader. It was getting default FileSystem.

It was solved with this pull request

The solutions was like so:

-      val fs = FileSystem.get(conf)
+      val fs = path.getFileSystem(conf)
0
votes

If you are running on EMR, you can just use "s3://bucket/path" instead of "s3n://...."