16
votes

I am a newbie to Spark. I'm trying to read a local csv file within an EMR cluster. The file is located in: /home/hadoop/. The script that I'm using is this one:

spark = SparkSession \
    .builder \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()\

df = spark.read.csv('/home/hadoop/observations_temp.csv, header=True)

When I run the script raises the following error message:

pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://ip-172-31-39-54.eu-west-1.compute.internal:8020/home/hadoop/observations_temp.csv

Then, I found out that I have to add file:// in the file path so it can read the file locally:

df = spark.read.csv('file:///home/hadoop/observations_temp.csv, header=True)

But this time, the above approach raised a different error:

Lost task 0.3 in stage 0.0 (TID 3,
ip-172-31-41-81.eu-west-1.compute.internal, executor 1): java.io.FileNotFoundException: File file:/home/hadoop/observations_temp.csv does not exist

I think is because the file// extension just read the file locally and it does not distribute the file across the other nodes.

Do you know how can I read the csv file and make it available to all the other nodes?

3
And, as an aside, I found storing data files in S3 made life a bit simpler, once you have granted your cluster access to your bucket/s. I know this doesn't solve your problem directly, but thought I'd mention anyway.ImDarrenG
Yes, it is located in the instance that runs the driver(the master node if that's what you means)ebertbm
The original file is in S3 but I have to download it first to be able to process it and convert it to another format. The output is the one that I'm trying to read.ebertbm
How are you downloading the file?ImDarrenG
I download the file using cli os.system("aws s3 cp "s3://raw_data/files/observation.protob /home/hadoop/mount_point/s3)). I download it to a different volume(because the file size). From there I can read it and generate the output file in /home/hadoop/ebertbm

3 Answers

21
votes

You are right about the fact that your file is missing from your worker nodes thus that raises the error you got.

Here is the official documentation Ref. External Datasets.

If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

So basically you have two solutions :

You copy your file into each worker before starting the job;

Or you'll upload in HDFS with something like : (recommended solution)

hadoop fs -put localfile /user/hadoop/hadoopfile.csv

Now you can read it with :

df = spark.read.csv('/user/hadoop/hadoopfile.csv', header=True)

It seems that you are also using AWS S3. You can always try to read it directly from S3 without downloading it. (with the proper credentials of course)

Some suggest that the --files tag provided with spark-submit uploads the files to the execution directories. I don't recommend this approach unless your csv file is very small but then you won't need Spark.

Alternatively, I would stick with HDFS (or any distributed file system).

2
votes

I think what you are missing is explicitly setting the master node while initializing the SparkSession, try something like this

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

and then read the file in the same way you have been doing

df = spark.read.csv('file:///home/hadoop/observations_temp.csv')

this should solve the problem...

0
votes

Might be useful for someone running zeppelin on mac using Docker.

  1. Copy files to custom folder : /Users/my_user/zeppspark/myjson.txt

  2. docker run -p 8080:8080 -v /Users/my_user/zeppspark:/zeppelin/notebook --rm --name zeppelin apache/zeppelin:0.9.0

  3. On Zeppelin you can run this to get your file:

%pyspark

json_data = sc.textFile('/zeppelin/notebook/myjson.txt')