6
votes

I am new to Spark and I code in Python.

Following exactly my "Learning Spark" guidelines, I see "You don't need to have Hadoop installed to run Spark"

Yet when I simply try to count the lines in one file using Pyspark I get the following error. What am I missing?

>>> lines = sc.textFile("README.md")
15/02/01 13:27:12 INFO MemoryStore: ensureFreeSpace(32728) called with curMem=0,
 maxMem=278019440
15/02/01 13:27:12 INFO MemoryStore: Block broadcast_0 stored as values in memory
 (estimated size 32.0 KB, free 265.1 MB)
>>> lines.count()
15/02/01 13:27:18 WARN NativeCodeLoader: Unable to load native-hadoop library fo
r your platform... using builtin-java classes where applicable
15/02/01 13:27:18 WARN LoadSnappy: Snappy native library not loaded
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Spark\spark-1.1.0-bin-hadoop1\python\pyspark\rdd.py", line 847, in co
unt
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "C:\Spark\spark-1.1.0-bin-hadoop1\python\pyspark\rdd.py", line 838, in su
m
    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File "C:\Spark\spark-1.1.0-bin-hadoop1\python\pyspark\rdd.py", line 759, in re
duce
    vals = self.mapPartitions(func).collect()
  File "C:\Spark\spark-1.1.0-bin-hadoop1\python\pyspark\rdd.py", line 723, in co
llect
    bytesInJava = self._jrdd.collect().iterator()
  File "C:\Spark\spark-1.1.0-bin-hadoop1\python\lib\py4j-0.8.2.1-src.zip\py4j\ja
va_gateway.py", line 538, in __call__
  File "C:\Spark\spark-1.1.0-bin-hadoop1\python\lib\py4j-0.8.2.1-src.zip\py4j\pr
otocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o26.collect.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: fil
e:/C:/Spark/spark-1.1.0-bin-hadoop1/bin/README.md
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.j
ava:197)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.ja
va:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:5
6)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1135)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:774)
        at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala
:305)
        at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Unknown Source)

>>> lines.first()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Spark\spark-1.1.0-bin-hadoop1\python\pyspark\rdd.py", line 1167, in f
irst
    return self.take(1)[0]
  File "C:\Spark\spark-1.1.0-bin-hadoop1\python\pyspark\rdd.py", line 1126, in t
ake
    totalParts = self._jrdd.partitions().size()
  File "C:\Spark\spark-1.1.0-bin-hadoop1\python\lib\py4j-0.8.2.1-src.zip\py4j\ja
va_gateway.py", line 538, in __call__
  File "C:\Spark\spark-1.1.0-bin-hadoop1\python\lib\py4j-0.8.2.1-src.zip\py4j\pr
otocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o20.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: fil
e:/C:/Spark/spark-1.1.0-bin-hadoop1/bin/README.md
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.j
ava:197)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.ja
va:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.sc
ala:50)
        at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Unknown Source)

>>>
3
You can change the title of the question. The WARN from Hadoop can be ignored. These are just messages from the Hadoop client library, which is still used, but does not mean you need Hadoop running. Javier's answer is correct.Sean Owen

3 Answers

4
votes

I have not tried to run spark in a Windows system, but it seems to me that the problem is:

py4j.protocol.Py4JJavaError: An error occurred while calling o26.collect. : org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: fil e:/C:/Spark/spark-1.1.0-bin-hadoop1/bin/README.md

You have to refer correctly the file to load. If you run pyspark from spark folder (i.e.: C:\spark), then lines = sc.textFile("README.md") is correct. But if you run pyspark from bin (i.e.: C:\spark\bin) you have to refer it the: lines = sc.textFile("../README.md"), or use the absolute path to the file.

2
votes

This is the solution for this error that i was getting on Spark cluster that is hosted in windows:

Load the raw HVAC.csv file, parse it using the function

data = sc.textFile("wasb:///HdiSamples/SensorSampleData/hvac/HVAC.csv")

We use (wasb:///) to allow Hadoop to access azure blog storage file and the three slashes is a relative reference to the running node container folder.

For example: If the path for your file in File Explorer in Spark cluster dashboard is:

sflcc1\sflccspark1\HdiSamples\SensorSampleData\hvac

So to describe the path is as follows: sflcc1: is the name of the storage account. sflccspark: is the cluster node name.

So we refer to the current cluster node name with the relative three slashes.

Hope this helps.

1
votes

I am a little late to the party. I had a similar problem (ec2 spark cluster). In my case, hdfs dint have the file I was looking for. Thus, I had to manually add the files I wanted using the following command

~/ephemeral-hdfs/bin/hadoop fs -put /dir/filename.txt filename.txt

hopefully that was helpful.