0
votes

I am reading a .txt file using wholeTextFiles() in python spark. I know that after reading wholeTextFiles(), the resultant rdd will be of format (filepath, content). I have multiple files to read. I want to cut the file name from the filepath and save to a spark dataframe and a part of the filename as a date folder in HDFS location. But while saving, I am not getting the corresponding filenames. Is there any way to do so? Below is my code

base_data = sc.wholeTextFiles("/user/nikhil/raw_data/")

data1 = base_data.map(lambda x : x[0]).flatMap(lambda x : x.split('/')).filter(lambda x : x.startswith('CH'))

data2=data1.flatMap(lambda x : x.split('F_')).filter(lambda x : x.startswith('2'))

print(data1.collect())

print(data2.collect())

df.repartition(1).write.mode('overwrite').parquet(outputLoc + "/xxxxx/" + data2)

logdf = sqlContext.createDataFrame(
    [(data1, pstrt_time, pend_time, 'DeltaLoad Completed')],
    ["filename","process_start_time", "process_end_time", "status"])`

output :

data1: ['CHNC_P0BCDNAF_20200217', 'CHNC_P0BCDNAF_20200227', 'CHNC_P0BCDNAF_20200615', 'CHNC_P0BCDNAF_20200925']

data2: ['20200217', '20200227', '20200615', '20200925']
1

1 Answers

0
votes

Here a Scala version that is easily convertible to pyspark by your good self:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType 

val files = sc.wholeTextFiles("/FileStore/tables/*ZZ.txt",0) 
val res1 = files.map(line => (line._1, line._2.split("\n").flatMap(x => x.split(" ")) )).map(elem => {(elem._1, elem._2) })
val res2 = res1.flatMap {
  case (x, y) => {
    y.map(z => (x, z))
}}
val res3 = res2.map(line => (line._1, line._1.split("/")(3), line._2))
val df = res3.toDF()
val df2 = df.withColumn("s", split($"_1", "/"))
            .withColumn("f1", $"s"(3)) 
            .withColumn("f2", $"f1".cast(StringType)) // avoid issues with split subsequently
            .withColumn("filename", substring_index(col("f2"), ".", 1))
df2.show(false)
df2.repartition($"filename").write.mode("overwrite").parquet("my_parquet") // default 200 and add partitionBy as well for good measure on your `write`.

Some sample data, you strip away via .drop or using select:

+--------------------------------+---------+-------+-------------------------------------+---------+---------+--------+
|_1                              |_2       |_3     |s                                    |f1       |f2       |filename|
+--------------------------------+---------+-------+-------------------------------------+---------+---------+--------+
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|wwww   |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|wwww   |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|rrr    |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|       |[dbfs:, FileStore, tables, AAAZZ.txt]|AAAZZ.txt|AAAZZ.txt|AAAZZ   |
|dbfs:/FileStore/tables/AAAZZ.txt|AAAZZ.txt|4445

...

Usual aspects of punctuation removal, trimming of spaces to apply. You need to adapt for your filename situation of course, I cannot see that.

The issue is you cannot split on an already splitted thing.