1
votes

i have a requirement to validate an ingest operation , bassically, i have two big files within HDFS, one is avro formatted (ingested files), another one is parquet formatted (consolidated file).

Avro file has this schema:

filename, date, count, afield1,afield2,afield3,afield4,afield5,afield6,...afieldN

Parquet file has this schema:

fileName,anotherField1,anotherField1,anotherField2,anotherFiel3,anotherField14,...,anotherFieldN

If i try to load both files in a DataFrame and then try to use a naive join-where, the job in my local machine takes more than 24 hours!, which is unaceptable.

ingestedDF.join(consolidatedDF).where($"filename" === $"fileName").count()

¿Which is the best way to achieve this? ¿dropping colums from the DataFrame before doing the join-where-count? ¿calculating the counts per dataframe and then join and sum?

PD

I was reading about map-side-joint technique but it looks that this technique would work for me if there was a small file able to fit in RAM, but i cant assure that, so, i would like to know which is the prefered way from the community to achieve this.

http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/

1
can't you calculate the counts per dataframe and then join and sum?mtoto
I guess i can, @mtoto, but, first, i would like to know which is the best way to achieve this. Actually i have running this sentence ingestedDF.join(consolidatedDF).where($"filename" === $"fileName").count() in order to know the number. When the job is done, i will try your suggestion. ¿ how should you write that code?aironman
Not sure what the question is: Do you only want to know the count of common filenames in both datasets? or the difference?maasg
the count of common filenames in both datasets, in a efficient way.aironman
Are the filenames unique in both datasets?maasg

1 Answers

2
votes

I would approach this problem by stripping down the data to only the field I'm interested in (filename), making a unique set of the filename with the source it comes from (the origin dataset). At this point, both intermediate datasets have the same schema, so we can union them and just count. This should be orders of magnitude faster than using a join on the complete data.

// prepare some random dataset
val data1 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.8).map(i => (s"file$i", i, "rubbish"))
val data2 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.7).map(i => (s"file$i", i, "crap"))

val df1 = sparkSession.createDataFrame(data1).toDF("filename", "index", "data")
val df2 = sparkSession.createDataFrame(data2).toDF("filename", "index", "data")

// select only the column we are interested in and tag it with the source.
// Lets make it distinct as we are only interested in the unique file count
val df1Filenames = df1.select("filename").withColumn("df", lit("df1")).distinct
val df2Filenames = df2.select("filename").withColumn("df", lit("df2")).distinct

// union both dataframes
val union = df1Filenames.union(df2Filenames).toDF("filename","source")

// let's count the occurrences of filename, by using a groupby operation
val occurrenceCount = union.groupBy("filename").count

// we're interested in the count of those files that appear in both datasets (with a count of 2)
occurrenceCount.filter($"count"===2).count