0
votes

I'm working with lastest version of Spark(2.1.1). I read multiple csv files to dataframe by spark.read.csv. After processing with this dataframe, How can I save it to output csv file with specific name.

For example, there are 100 input files (in1.csv,in2.csv,in3.csv,...in100.csv). The rows that belong to in1.csv should be saved as in1-result.csv. The rows that belong to in2.csv should be saved as in2-result.csv and so on. (The default file name will be like part-xxxx-xxxxx which is not readable)

I have seen partitionBy(col) but look like it can just partition by column.

Another question is I want to plot my dataframe. Spark has no built-in plot library. Many people use df.toPandas() to convert to pandas and plot it. Is there any better solution? Since my data is very big and toPandas() will cause memory error. I'm working on the server and want to save the plot as image instead of showing.

1
I'm not asking about how to save it normally to the disk. I want to save it with specific name. Ex. the rows from in1.csv will be written as in1-result.csv. (Exactly this name NOT part-xxxx)Sirapat
spark uses hadoop under the hood and unless you'll use save it as a hadoop file using the MultipleTextOutputFormat and RDDs of string, there is no solution out of the box for that in sparkeliasah
When it comes to plotting I think using toPandas is good approach, but you need to first aggregate your DataFrame, so that converting to Pandas is not big issue. You rather don't want to plot all rows from DataFrame, because it would be not readable.Piotr Kalański
Thank you for comments from both of you.Sirapat

1 Answers

1
votes

I suggest below solution for writing DataFrame in specific directories related to input file:

  • in loop for each file:
    • read csv file
    • add new column with information about input file using withColumn tranformation
    • union all DataFrames using union transformation
  • do required preprocessing
  • save result using partitionBy by providing column with input file information, so that rows related to the same input file will be saved in the same output directory

Code could look like:

all_df = None
for file in files: # where files is list of input CSV files that you want to read
    df = spark.read.csv(file)
    df.withColumn("input_file", file)
    if all_df is None:
        all_df = df
    else:
        all_df = all_df.union(df)

# do preprocessing

result.write.partitionBy(result.input_file).csv(outdir)