1
votes

I want to understand the best way to solve date-related problems in spark SQL. I'm trying to solve simple problem where I have a file that has date ranges like below:

startdate,enddate
01/01/2018,30/01/2018
01/02/2018,28/02/2018
01/03/2018,30/03/2018

and another table that has date and counts:

date,counts
03/01/2018,10
25/01/2018,15
05/02/2018,23
17/02/2018,43

Now all I want to find is sum of counts for each date range, so the output expected is:

startdate,enddate,sum(count)
01/01/2018,30/01/2018,25
01/02/2018,28/02/2018,66
01/03/2018,30/03/2018,0

Following is the code I have written but it's giving me a cartesian result set:

val spark = SparkSession.builder().appName("DateBasedCount").master("local").getOrCreate()
import spark.implicits._

val df1 = spark.read.option("header","true").csv("dateRange.txt").toDF("startdate","enddate")
val df2 = spark.read.option("header","true").csv("dateCount").toDF("date","count")

df1.createOrReplaceTempView("daterange")
df2.createOrReplaceTempView("datecount")

val res = spark.sql("select startdate,enddate,date,visitors from daterange left join datecount on date >= startdate and date <= enddate")
res.rdd.foreach(println)

The output is:

| startdate|   enddate|      date|visitors|
|01/01/2018|30/01/2018|03/01/2018|      10|
|01/01/2018|30/01/2018|25/01/2018|      15|
|01/01/2018|30/01/2018|05/02/2018|      23|
|01/01/2018|30/01/2018|17/02/2018|      43|
|01/02/2018|28/02/2018|03/01/2018|      10|
|01/02/2018|28/02/2018|25/01/2018|      15|
|01/02/2018|28/02/2018|05/02/2018|      23|
|01/02/2018|28/02/2018|17/02/2018|      43|
|01/03/2018|30/03/2018|03/01/2018|      10|
|01/03/2018|30/03/2018|25/01/2018|      15|
|01/03/2018|30/03/2018|05/02/2018|      23|
|01/03/2018|30/03/2018|17/02/2018|      43|

Now if I groupby startdate and enddate with sum on count I see following result which is incorrect:

| startdate|   enddate| sum(count)|
|01/01/2018|30/01/2018|       91.0|
|01/02/2018|28/02/2018|       91.0|
|01/03/2018|30/03/2018|       91.0|

So how do we handle this and what is the best way to deal with dates in Spark SQL? Should we build columns as dateType in first place OR read as strings and then cast it to date while necessary?

1
Are the start and end dates always for one month intervals?Shaido
need not be, those are just a sample dataset.Gurupraveen

1 Answers

1
votes

The problem is that your dates are not interpreted as dates by Spark automatically, they are just strings. The solution is therefore to convert them into dates:

val df1 = spark.read.option("header","true").csv("dateRange.txt")
  .toDF("startdate","enddate")
  .withColumn("startdate", to_date(unix_timestamp($"startdate", "dd/MM/yyyy").cast("timestamp")))
  .withColumn("enddate", to_date(unix_timestamp($"enddate", "dd/MM/yyyy").cast("timestamp")))
val df2 = spark.read.option("header","true").csv("dateCount")
  .toDF("date","count")
  .withColumn("date", to_date(unix_timestamp($"date", "dd/MM/yyyy").cast("timestamp")))

Then use the same code as before. The output of the SQL command is now:

+----------+----------+----------+------+
| startdate|   enddate|      date|counts|
+----------+----------+----------+------+
|2018-01-01|2018-01-30|2018-01-03|    10|
|2018-01-01|2018-01-30|2018-01-25|    15|
|2018-02-01|2018-02-28|2018-02-05|    23|
|2018-02-01|2018-02-28|2018-02-17|    43|
|2018-03-01|2018-03-30|      null|  null|
+----------+----------+----------+------+

If the last line should be ignored, simply change to an inner join instead.

Using df.groupBy("startdate", "enddate").sum() on this new dataframe will give the wanted output.