3
votes

I am trying to read a csv or txt file from GCS in a Dataproc pyspark Application. I have tried so many things. So far the most promising:

#!/usr/bin/python 
import os
import sys
import pyspark
from pyspark.sql import SQLContext
import pandas as pd
from pyspark import SparkContext, SparkConf

os.environ['SPARK_HOME'] = "/usr/lib/spark/"
sys.path.append("/usr/lib/spark/python/")

sc =SparkContext()
sql_sc = SQLContext(sc)

eithe:

pandas_df = pd.read_csv('{BUCKET}/user2user_relations.csv')
s_df = sql_sc.createDataFrame(pandas_df)

or

data = sc.textFile('gs://{BUCKET}/user2user_relations.csv')

There is no need for the pandas workaround. I want to end up with an RDD for spark ALS Recommendation. The error I am getting is:

Job [fdcad5bcf77343e2b8782097cd7450cb] submitted.
Waiting for job output...
18/08/08 16:50:20 INFO org.spark_project.jetty.util.log: Logging initialized @2662ms
18/08/08 16:50:20 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT
18/08/08 16:50:20 INFO org.spark_project.jetty.server.Server: Started @2751ms
18/08/08 16:50:20 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@41335d47{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
18/08/08 16:50:21 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.7-hadoop2
18/08/08 16:50:21 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at new-try-gcs-pd-m/10.164.0.2:8032
18/08/08 16:50:24 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1533726146765_0018
dataproc-d3d8d55c-05b3-4211-adf2-2014ebdbc20c-europe-west4
go-de-internal
gs://dataproc-d3d8d55c-05b3-4211-adf2-2014ebdbc20c-europe-west4/
gs://dataproc-d3d8d55c-05b3-4211-adf2-2014ebdbc20c-europe-west4/user2user_relations.csv
Traceback (most recent call last):
  File "/tmp/fdcad5bcf77343e2b8782097cd7450cb/pyspark_gcs_acess.py", line 19, in <module>
    pandas_df = pd.read_csv(input_file)
  File "/usr/lib/python2.7/dist-packages/pandas/io/parsers.py", line 452, in parser_f
    return _read(filepath_or_buffer, kwds)
  File "/usr/lib/python2.7/dist-packages/pandas/io/parsers.py", line 234, in _read
    parser = TextFileReader(filepath_or_buffer, **kwds)
  File "/usr/lib/python2.7/dist-packages/pandas/io/parsers.py", line 542, in __init__
    self._make_engine(self.engine)
  File "/usr/lib/python2.7/dist-packages/pandas/io/parsers.py", line 679, in _make_engine
    self._engine = CParserWrapper(self.f, **self.options)
  File "/usr/lib/python2.7/dist-packages/pandas/io/parsers.py", line 1041, in __init__
    self._reader = _parser.TextReader(src, **kwds)
  File "parser.pyx", line 332, in pandas.parser.TextReader.__cinit__ (pandas/parser.c:3218)
  File "parser.pyx", line 559, in pandas.parser.TextReader._setup_parser_source (pandas/parser.c:5594)
IOError: File gs://{}/user2user_relations.csv does not exist
18/08/08 16:50:28 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@41335d47{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [fdcad5bcf77343e2b8782097cd7450cb] entered state [ERROR] while waiting for [DONE].

Thanks

2
Can you list your file with gsutil, e.g. gsutil ls gs://<bucket_name>/user2user_relations.csv? - Igor Dvorzhak
yes :) the file exists. I checked that already and also tried to call another file in the dataproc job. same thing - Johanna Schacht
Do you have any special network configuration? VPC for example? And just to be sure, did you try to list file with gsutil after sshing in Dataproc VM? - Igor Dvorzhak
"Do you have any special network configuration? VPC for example?" - no | "And just to be sure, did you try to list file with gsutil after sshing in Dataproc VM?" - yes, the file is there - Johanna Schacht
@JohannaSchacht Asking a clarification question: Is is right to assume that BUCKET is a place holder and you are replacing this in your code? asking this because the error log seems to show that the BUCKET is not replaces i.e gs://{}/user2user_relations.csv does not exist is not a valid GCS bucket location - brotich

2 Answers

0
votes

Pandas will not read directly from GCS using the URI you provided. Differently from Spark in Dataproc, which has the GCS Connector installed by default.

I suggest if you want to read the same blob from pandas to either:

from google.cloud import storage
client = storage.Client()
# https://console.cloud.google.com/storage/browser/[bucket-id]/
bucket = client.get_bucket('bucket-id-here')
# Then do other things...
blob = bucket.get_blob('remote/path/to/file.txt')

df = pd.read_csv(blob.download_as_string())
  • Or, copy the file to the local directory first, using the command gsutil cp gs://bucket/blob /local/folder/blob
df = pd.read_csv('/local/folder/blob')

Hope it helps.

0
votes

You are missing the BUCKET value, just replace {BUCKET} with your bucket name or set the variable

data = sc.textFile('gs://{BUCKET}/user2user_relations.csv')