27
votes

I want to create a data processing pipeline in AWS to eventually use the processed data for Machine Learning.

I have a Scala script that takes raw data from S3, processes it and writes it to HDFS or even S3 with Spark-CSV. I think I can use multiple files as input if I want to use AWS Machine Learning tool for training a prediction model. But if I want to use something else, I presume it is best if I receive a single CSV output file.

Currently, as I do not want to use repartition(1) nor coalesce(1) for performance purposes, I have used hadoop fs -getmerge for manual testing, but as it just merges the contents of the job output files, I am running into a small problem. I need a single row of headers in the data file for training the prediction model.

If I use .option("header","true") for the spark-csv, then it writes the headers to every output file and after merging I have as many lines of headers in the data as there were output files. But if the header option is false, then it does not add any headers.

Now I found an option to merge the files inside the Scala script with Hadoop API FileUtil.copyMerge. I tried this in spark-shell with the code below.

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")

But this solution still just concatenates the files on top of each other and does not handle headers. How can I get an output file with only one row of headers?

I even tried adding df.columns.mkString(",") as the last argument for copyMerge, but this added the headers still multiple times, not once.

6
i m also facing the same issue .is this fixed ? - senthil kumar p
how about filtering the DataFrame to zero rows, export that with header=true, export the rest of the data with header=false and than merge header with partitions? - Boern
@Boern this may work. Although I think it would require copying the headers file to the same output as the data and making sure it is always the first file. I think this current solution wouldn't allow to write into the same path. Of course appending might solve that problem, need to try and play around with it a while. - V. Samma
@bleka The "how" of that is the point of this question. One could imagine a flag to spark that tells it to only save a header with the file designated part-0000, or perhaps an intelligent concatenation that combines the files saved by multiple workers but only keeps the header from one of them. copyMerge looks like it just combines files, so if the files have headers the header will appear multiple times, or if the files lack headers there will be no header at all, as V. Samma says in the question. Or does copyMerge have different behavior in your answer? - Kyle Heuton
@belka these aren't different dataframes with different columns though, these are just different partitions of the same dataframe with the same columns - Kyle Heuton

6 Answers

6
votes

you can walk around like this.

  • 1.Create a new DataFrame(headerDF) containing header names.
  • 2.Union it with the DataFrame(dataDF) containing the data.
  • 3.Output the union-ed DataFrame to disk with option("header", "false").
  • 4.merge partition files(part-0000**0.csv) using hadoop FileUtil

In this ways, all partitions have no header except for a single partition's content has a row of header names from the headerDF. When all partitions are merged together, there is a single header in the top of the file. Sample code are the following

  //dataFrame is the data to save on disk
  //cast types of all columns to String
  val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)

  //create a new data frame containing only header names
  import scala.collection.JavaConverters._
  val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)

  //merge header names with data
  headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)

  //use hadoop FileUtil to merge all partition csv files into a single file
  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
1
votes
  1. Output the header using dataframe.schema ( val header = dataDF.schema.fieldNames.reduce(_ + "," + _))
  2. create a file with the header on dsefs
  3. append all the partition files (headerless) to the file in #2 using hadoop Filesystem API
0
votes

To merge files in a folder into one file:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

If you want to merge all files into one file, but still in the same folder (but this brings all data to the driver node):

dataFrame
      .coalesce(1)
      .write
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .save(out)

Another solution would be to use solution #2 then move the one file inside the folder to another path (with the name of our CSV file).

def df2csv(df: DataFrame, fileName: String, sep: String = ",", header: Boolean = false): Unit = {
    val tmpDir = "tmpDir"

    df.repartition(1)
      .write
      .format("com.databricks.spark.csv")
      .option("header", header.toString)
      .option("delimiter", sep)
      .save(tmpDir)

    val dir = new File(tmpDir)
    val tmpCsvFile = tmpDir + File.separatorChar + "part-00000"
    (new File(tmpCsvFile)).renameTo(new File(fileName))

    dir.listFiles.foreach( f => f.delete )
    dir.delete
}
0
votes

Try to specify the schema of the header and read all file from the folder using the option drop malformed of spark-csv. This should let you read all the files in the folder keeping only the headers (because you drop the malformed). Example:

val headerSchema = List(
  StructField("example1", StringType, true),
  StructField("example2", StringType, true),
  StructField("example3", StringType, true)
)

val header_DF =sqlCtx.read
  .option("delimiter", ",")
  .option("header", "false")
  .option("mode","DROPMALFORMED")
  .option("inferSchema","false")
  .schema(StructType(headerSchema))
  .format("com.databricks.spark.csv")
  .load("folder containg the files")

In header_DF you will have only the rows of the headers, from this you can trasform the dataframe the way you need.

0
votes

We had a similar issue, following the below approach to get single output file-

  1. Write dataframe to hdfs with headers and without using coalesce or repartition (after the transformations).
dataframe.write.format("csv").option("header", "true").save(hdfs_path_for_multiple_files)
  1. Read the files from the previous step and write back to different location on hdfs with coalesce(1).
dataframe = spark.read.option('header', 'true').csv(hdfs_path_for_multiple_files)

dataframe.coalesce(1).write.format('csv').option('header', 'true').save(hdfs_path_for_single_file)

This way, you will avoid performance issues related to coalesce or repartition while execution of transformations (Step 1). And the second step provides single output file with one header line.

-4
votes
 // Convert JavaRDD  to CSV and save as text file
        outputDataframe.write()
                .format("com.databricks.spark.csv")
                // Header => true, will enable to have header in each file
                .option("header", "true")

Please follow the link with Integration test on how to write a single header

http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/