2
votes

I need to implement converting csv.gz files in a folder, both in AWS S3 and HDFS, to Parquet files using Spark (Scala preferred). One of the columns of the data is a timestamp and I only have a week of dataset. The timestamp format is:

'yyyy-MM-dd hh:mm:ss'

The output that I desire is that for every day, there is a folder (or partition) where the Parquet files for that specific date is located. So there would 7 output folders or partitions.

I only have a faint idea of how to do this, only sc.textFile is on my mind. Is there a function in Spark that can convert to Parquet? How do I implement this in S3 and HDFS?

Thanks for you help.

3

3 Answers

2
votes

If you look into the Spark Dataframe API, and the Spark-CSV package, this will achieve the majority of what you're trying to do - reading in the CSV file into a dataframe, then writing the dataframe out as parquet will get you most of the way there.

You'll still need to do some steps on parsing the timestamp and using the results to partition the data.

0
votes

old topic but ill think it is important to answer even old topics if not answered right.

in spark version >=2 csv package is already included before that you need to import databricks csv package to your job e.g. "--packages com.databricks:spark-csv_2.10:1.5.0".

Example csv:

id,name,date
1,pete,2017-10-01 16:12
2,paul,2016-10-01 12:23
3,steve,2016-10-01 03:32
4,mary,2018-10-01 11:12 
5,ann,2018-10-02 22:12
6,rudy,2018-10-03 11:11
7,mike,2018-10-04 10:10

First you need to create the hivetable so that the spark written data is compatible with the hive schema. (this might be not needed anymore in future versions)

create table:

create table part_parq_table (
    id int,
    name string
    )
partitioned by (date string)
stored as parquet

after youve done that you can easy read the csv and save the dataframe to that table.The second step overwrites the column date with the dateformat like"yyyy-mm-dd". For each of the value a folder will be created with the specific lines in it.

SCALA Spark-Shell example:

spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") 
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

First two lines are hive configurations which are needed to create a partition folder which not exists already.

var df=spark.read.format("csv").option("header","true").load("/tmp/test.csv")
df=df.withColumn("date",substring(col("date"),0,10))
df.show(false)
df.write.format("parquet").mode("append").insertInto("part_parq_table")

after the insert is done you can directly query the table like "select * from part_parq_table". The folders will be created in the tablefolder on default cloudera e.g. hdfs:///users/hive/warehouse/part_parq_table

hope that helps BR

-2
votes

Read csv file /user/hduser/wikipedia/pageviews-by-second-tsv

"timestamp"             "site"  "requests"
"2015-03-16T00:09:55"   "mobile"        1595
"2015-03-16T00:10:39"   "mobile"        1544

The following code uses spark2.0

import org.apache.spark.sql.types._
var wikiPageViewsBySecondsSchema = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
var wikiPageViewsBySecondsDF = spark.read.schema(wikiPageViewsBySecondsSchema).option("header", "true").option("delimiter", "\t").csv("/user/hduser/wikipedia/pageviews-by-second-tsv")

Convert String-timestamp to timestamp

wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.withColumn("timestampTS", $"timestamp".cast("timestamp")).drop("timestamp")
or 
wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.select($"timestamp".cast("timestamp"), $"site", $"requests") 

Write into parquet file.

wikiPageViewsBySecondsTableDF.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")