I'm trying to use apache Spark sc.wholeTextFiles() on file that is stored amazon S3 I'm getting following Error:
14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1
14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1
Traceback (most recent call last):
File "/root/distributed_rdd_test.py", line 27, in <module>
result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
File "/root/spark/python/pyspark/rdd.py", line 1126, in take
totalParts = self._jrdd.partitions().size()
File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions.
: java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:489)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)
at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50)
at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
My code is following:
sc = SparkContext(appName="Process wiki")
distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput')
result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
for item in result:
print item.getvalue()
sc.stop()
So my question is, is it possible to read whole files from S3 in Spark? Based on the documentation it should be possible, but it seems that it does not work for me.
When I do just:
sc = SparkContext(appName="Process wiki")
distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput').take(10)
print distData
Then the error that I'm getting is exactly the same.
EDIT:
Of course I've tried sc.textFile('s3n://wiki-dump/wikiinput'), which reads the file without any problems.
EDIT2:
I've tried also to run the same code from Scala and I'm getting still the same error. Particularly I'm trying to run val file = sc.wholeTextFiles("s3n://wiki-dump/wikiinput").first()