I'm working on averaging some temperature values in Scala and am trying to find the coldest temperature for each hour period. Then, whichever period has the coldest temperature, and the most amount of occurrences throughout the dataset, I want to select that time.
My code seems to be doing my calculations and windowing correctly, the only issue I have is needing to be able to correctly output the timing window. The code below is extracting the individual columns from my input file (month, day, year, time all broken into individual columns) and concatenating them into a single column.
My code
val data = osh.select(col("TemperatureF"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).show()
Sample input from above code:
+------------+-------------------+
|TemperatureF| Date|
+------------+-------------------+
| 35.1|01/01/2000 12:53 AM|
| 35.1| 01/01/2000 1:53 AM|
| 35.1| 01/01/2000 2:53 AM|
| 34.0| 01/01/2000 3:53 AM|
| 32.0| 01/01/2000 4:53 AM|
| 30.9| 01/01/2000 5:53 AM|
| 28.0| 01/01/2000 6:53 AM|
+------------+-------------------+
When processing this input, I'm attempting to convert it from a string (because it's concatenated) to a timestamp of the form: MM/dd/yyyy hh:mm a
. My code currently seems to convert my timestamp to a 24 hour scale. From the reading I've done, having hh
instead of HH
should refer to a 12 hour span and the a
should add the AM/PM. Any suggestions how to change this issue?
Full Code:
val data = osh.select(col("TemperatureF"), concat(format_string("%02d",col("Month")),lit("/"),format_string("%02d",col("Day")),lit("/"),col("Year"),lit(" "),col("TimeCST")).as("Date")).show()
val ts = to_timestamp($"Date","MM/dd/yyyy hh:mm a")
val mydata=data.withColumn("ts",ts).show()
val groupByWindow = mydata.groupBy(window(col("ts"), "1 hour")).agg(avg("TemperatureF").as("avgTemp")).select("window.start", "window.end", "avgTemp").show()
val daily = groupByWindow.withColumn("_tmp",split($"start"," ")).select($"_tmp".getItem(0).as("Date"),$"_tmp".getItem(1).as("StartTime"),$"end",$"avgTemp").withColumn("_tmp2",split($"end"," ")).select($"Date",$"StartTime",$"_tmp2".getItem(1).as("EndTime"),$"avgTemp")
daily.createOrReplaceTempView("myview")
spark.sqlContext.sql("Select StartTime,EndTime,avg(avgTemp) avgTemp,count(*) Total from myview group by StartTime,EndTime order by avgTemp ASC, total DESC").show()
Current Output:
+---------+--------+-------------------+-----+
|StartTime| EndTime| avgTemp|Total|
+---------+--------+-------------------+-----+
| 10:00:00|11:00:00|-16.314026481823376| 5726|
| 11:00:00|12:00:00|-3.8934910974897816| 5710|
| 09:00:00|10:00:00| 22.41515848657947| 5702|
| 23:00:00|00:00:00| 34.76578133360086| 5696|
+---------+--------+-------------------+-----+
Expected Output:
+---------+--------+-------------------+-----+
|StartTime| EndTime| avgTemp|Total|
+---------+--------+-------------------+-----+
| 10:00 AM|11:00 AM|-16.314026481823376| 5726|
| 11:00 AM|12:00 PM|-3.8934910974897816| 5710|
| 09:00 AM|10:00 AM| 22.41515848657947| 5702|
| 23:00 PM|12:00 AM| 34.76578133360086| 5696|
+---------+--------+-------------------+-----+