0
votes

I'm working on averaging some temperature values in Scala and am trying to find the coldest temperature for each hour period. Then, whichever period has the coldest temperature, and the most amount of occurrences throughout the dataset, I want to select that time.

My code seems to be doing my calculations and windowing correctly, the only issue I have is needing to be able to correctly output the timing window. The code below is extracting the individual columns from my input file (month, day, year, time all broken into individual columns) and concatenating them into a single column.

My code

val data = osh.select(col("TemperatureF"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).show()

Sample input from above code:

+------------+-------------------+
|TemperatureF|               Date|
+------------+-------------------+
|        35.1|01/01/2000 12:53 AM|
|        35.1| 01/01/2000 1:53 AM|
|        35.1| 01/01/2000 2:53 AM|
|        34.0| 01/01/2000 3:53 AM|
|        32.0| 01/01/2000 4:53 AM|
|        30.9| 01/01/2000 5:53 AM|
|        28.0| 01/01/2000 6:53 AM|
+------------+-------------------+

When processing this input, I'm attempting to convert it from a string (because it's concatenated) to a timestamp of the form: MM/dd/yyyy hh:mm a. My code currently seems to convert my timestamp to a 24 hour scale. From the reading I've done, having hh instead of HH should refer to a 12 hour span and the a should add the AM/PM. Any suggestions how to change this issue?

Full Code:

val data = osh.select(col("TemperatureF"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).show()
val ts = to_timestamp($"Date","MM/dd/yyyy hh:mm a")
val mydata=data.withColumn("ts",ts).show()

val groupByWindow = mydata.groupBy(window(col("ts"), "1 hour")).agg(avg("TemperatureF").as("avgTemp")).select("window.start", "window.end", "avgTemp").show()

val daily = groupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),$"_tmp".getItem(1).as("StartTime"),$"end",$"avgTemp").withColumn("_tmp2",split($"end"," ")).select($"Date",$"StartTime",$"_tmp2".getItem(1).as("EndTime"),$"avgTemp")

daily.createOrReplaceTempView("myview")


spark.sqlContext.sql("Select StartTime,EndTime,avg(avgTemp) avgTemp,count(*) Total from myview group by StartTime,EndTime order by avgTemp ASC, total DESC").show()

Current Output:

+---------+--------+-------------------+-----+
|StartTime| EndTime|            avgTemp|Total|
+---------+--------+-------------------+-----+
| 10:00:00|11:00:00|-16.314026481823376| 5726|
| 11:00:00|12:00:00|-3.8934910974897816| 5710|
| 09:00:00|10:00:00|  22.41515848657947| 5702|
| 23:00:00|00:00:00|  34.76578133360086| 5696|
+---------+--------+-------------------+-----+

Expected Output:

+---------+--------+-------------------+-----+
|StartTime| EndTime|            avgTemp|Total|
+---------+--------+-------------------+-----+
| 10:00 AM|11:00 AM|-16.314026481823376| 5726|
| 11:00 AM|12:00 PM|-3.8934910974897816| 5710|
| 09:00 AM|10:00 AM|  22.41515848657947| 5702|
| 23:00 PM|12:00 AM|  34.76578133360086| 5696|
+---------+--------+-------------------+-----+
2
Why don't you try in this way - scala> val daily = groupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),date_format($"_tmp".getItem(1),"hh:mm:ss a").as("startTime"),$"end",$"avgTemp").withColumn("_tmp2",spl it($"end"," ")).select($"Date",$"StartTime",date_format($"_tmp2".getItem(1),"hh:mm:ss a").as("EndTime"),$"avgTemp") BY Adding - date_formatGoutam Pradhan
@GoutamPradhan This works great. Thanks a lot.peakstatus

2 Answers

1
votes

You need to convert your Strings StartDate & EndDate in daily to timestamp and then to String again with the required format.

Las 4 withColumns methods added to your original code

  val daily = groupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),$"_tmp".getItem(1).as("StartTime"),$"end",$"avgTemp").withColumn("_tmp2",split($"end"," ")).select($"Date",$"StartTime",$"_tmp2".getItem(1).as("EndTime"),$"avgTemp")
              .withColumn("StartTime", unix_timestamp($"StartTime", "HH:mm:ss"))
              .withColumn("StartTime", from_unixtime($"StartTime", "HH:mm a"))
              .withColumn("EndTime", unix_timestamp($"EndTime", "HH:mm:ss"))
              .withColumn("EndTime", from_unixtime($"EndTime", "HH:mm a"))

And then the next steps that you already have:

daily.createOrReplaceTempView("myview")

spark.sqlContext.sql("Select StartTime,EndTime,avg(avgTemp) avgTemp,count(*) Total from myview group by StartTime,EndTime order by avgTemp ASC, total DESC")

Output

+---------+--------+-------+-----+
|StartTime| EndTime|avgTemp|Total|
+---------+--------+-------+-----+
| 06:00 AM|07:00 AM|   28.0|    1|
| 05:00 AM|06:00 AM|   30.9|    1|
| 04:00 AM|05:00 AM|   32.0|    1|
| 03:00 AM|04:00 AM|   34.0|    1|
| 00:00 AM|01:00 AM|   35.1|    1|
| 01:00 AM|02:00 AM|   35.1|    1|
| 02:00 AM|03:00 AM|   35.1|    1|
+---------+--------+-------+-----+
1
votes

Use date_format() - will solve your problem

scala> val daily = groupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),date_format($"_tmp".getItem(1),"hh:mm:ss a").as("startTime"),$"end",$"avgTemp").withColumn("_tmp2",spl it($"end"," ")).select($"Date",$"StartTime",date_format($"_tmp2".getItem(1),"hh:mm:ss a").as("EndTime"),$"avgTemp")
scala> daily.show
+----------+-----------+-----------+-------+
|      Date|  StartTime|    EndTime|avgTemp|
+----------+-----------+-----------+-------+
|2000-01-01|01:30:00 AM|02:30:00 AM|   35.1|
|2000-01-01|03:30:00 AM|04:30:00 AM|   34.0|
|2000-01-01|02:30:00 AM|03:30:00 AM|   35.1|
|2000-01-01|06:30:00 AM|07:30:00 AM|   28.0|
|2000-01-01|12:30:00 AM|01:30:00 AM|   35.1|
|2000-01-01|05:30:00 AM|06:30:00 AM|   30.9|
|2000-01-01|04:30:00 AM|05:30:00 AM|   32.0|
+----------+-----------+-----------+-------+