I had to unzip files from Amazon S3 into my driver node (Spark cluster), and I need to load all these csv files as a Spark Dataframe, but I found the next problem when I tried to load the data from the driver node:
PySpark:
df = self.spark.read.format("csv").option("header", True).load("file:/databricks/driver/*.csv")
'Path does not exist: file:/folder/*.csv'
I tried to move all these files to dbfs using dbutils.fs.mv() but I'm running a Python file and I can't use dbutils(). I think I need to broadcast the file but I don't know-how, because I tried with self.sc.textFile("file:/databricks/driver/*.csv").collect() and self.sc.addFile("file:/databricks/driver/*.csv") and the process it is not able to find the files.
UPDATE When I ran this code:
import os
BaseLogs("INFO", os.getcwd())
folders = []
for r, d, f in os.walk(os.getcwd()):
for folder in d:
folders.append(os.path.join(r, folder))
for f in folders:
BaseLogs("INFO", f)
BaseLogs("INFO", os.listdir("/databricks/driver/zipFiles/s3Sensor/2017/Tracking_Bounces_20190906.csv.zip"))
BaseLogs("INFO", os.listdir("/databricks/driver/zipFiles/s3Sensor/2017/Tracking_Opens_20190907.zip"))
Then I tried to do:
try:
df = self.spark.read.format("csv").option("header", True).option("inferSchema", "true").load("file:///databricks/driver/zipFiles/s3Sensor/2017/Tracking_Bounces_20190906.csv.zip/Bounces.csv")
except Exception as e:
BaseLogs("INFO", e)
BaseLogs("INFO", "Reading {0} as Spark Dataframe".format("file://" + file + ".csv"))
df = self.spark.read.format("csv").option("header", True).option("inferSchema", "true").load("file://" + file + ".csv")
I obtained the next error:
2019-10-24T15:16:25.321+0000: [GC (Allocation Failure) [PSYoungGen: 470370K->14308K(630272K)] 479896K->30452K(886784K), 0.0209171 secs] [Times: user=0.04 sys=0.01, real=0.02 secs] 2019-10-24T15:16:25.977+0000: [GC (Metadata GC Threshold) [PSYoungGen: 211288K->20462K(636416K)] 227432K->64316K(892928K), 0.0285984 secs] [Times: user=0.04 sys=0.02, real=0.02 secs] 2019-10-24T15:16:26.006+0000: [Full GC (Metadata GC Threshold) [PSYoungGen: 20462K->0K(636416K)] [ParOldGen: 43854K->55206K(377344K)] 64316K->55206K(1013760K), [Metaspace: 58323K->58323K(1099776K)], 0.1093583 secs] [Times: user=0.31 sys=0.02, real=0.12 secs] 2019-10-24T15:16:28.333+0000: [GC (Allocation Failure) [PSYoungGen: 612077K->23597K(990720K)] 667283K->78811K(1368064K), 0.0209207 secs] [Times: user=0.02 sys=0.01, real=0.02 secs] INFO: An error occurred while calling o195.load. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 172.31.252.216, executor 0): java.io.FileNotFoundException: File file:/databricks/driver/zipFiles/s3Sensor/2017/Tracking_Bounces_20190906.csv.zip/Bounces.csv does not exist It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:248) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)

file:///databricks/driver/*.csv. Did you explicitly save the unzipped files under that directory? - etherealyn