0
votes

I have a Dataframe with the following schema:

root
 |-- eventTimestamp: long 
 |-- trackingId: string 
 |-- voyageStatus: string 

Here are some sample rows:

+--------------+----------+------------+
|eventTimestamp|trackingId|voyageStatus|
+--------------+----------+------------+
|          504 |78911c81  |COMPLETE    |
|          504 |3b77a150  |ACTIVE      |
|          390 |ece6c8d0  |ACTIVE      |
|          390 |78911c81  |ACTIVE      |
|          349 |3b77a150  |ACTIVE      |
|          349 |ece6c8d0  |ACTIVE      |
|          349 |78911c81  |ACTIVE      |
|          350 |3b77a150  |ACTIVE      |
|          350 |ece6c8d0  |ACTIVE      |
|          350 |78911c81  |ACTIVE      |
|          351 |3b77a150  |ACTIVE      |
|          351 |ece6c8d0  |ACTIVE      |
|          351 |78911c81  |ACTIVE      |
|          352 |3b77a150  |ACTIVE      |
|          352 |ece6c8d0  |ACTIVE      |
|          352 |78911c81  |ACTIVE      |
|          507 |3b77a150  |COMPLETE    |
|          349 |ece6c8d0  |ACTIVE      |
|          349 |78911c81  |ACTIVE      |
|          349 |3b77a150  |ACTIVE      |
+--------------+----------+------------+

I want to add a new column of type long called completionEventTimestamp. For each row, this column will have the following value:

  1. If there is a record with the same trackingId as the current row with the value of voyageStatus equal to "COMPLETE", then the value will be the eventTimestamp of that record.
  2. Otherwise, the value will be -1 (so the row can be filtered out later).

Here is what the transformation would yield for the example above:

+--------------+----------+------------+------------------------+
|eventTimestamp|trackingId|voyageStatus|completionEventTimestamp|
+--------------+----------+------------+------------------------+
|          504 |78911c81  |COMPLETE    |                     504|
|          504 |3b77a150  |ACTIVE      |                     507|
|          390 |ece6c8d0  |ACTIVE      |                      -1|
|          390 |78911c81  |ACTIVE      |                     504|
|          349 |3b77a150  |ACTIVE      |                     507|
|          349 |ece6c8d0  |ACTIVE      |                      -1|
|          349 |78911c81  |ACTIVE      |                     504|
|          350 |3b77a150  |ACTIVE      |                     507|
|          350 |ece6c8d0  |ACTIVE      |                      -1|
|          350 |78911c81  |ACTIVE      |                     504|
|          351 |3b77a150  |ACTIVE      |                     507|
|          351 |ece6c8d0  |ACTIVE      |                      -1|
|          351 |78911c81  |ACTIVE      |                     504|
|          352 |3b77a150  |ACTIVE      |                     507|
|          352 |ece6c8d0  |ACTIVE      |                      -1|
|          352 |78911c81  |ACTIVE      |                     504|
|          507 |3b77a150  |COMPLETE    |                     507|
|          349 |ece6c8d0  |ACTIVE      |                      -1|
|          349 |78911c81  |ACTIVE      |                     504|
|          349 |3b77a150  |ACTIVE      |                     507|
+--------------+----------+------------+------------------------+

If this helps at all, if a record with a given trackingId has a voyageStatus of "COMPLETE", then it will be the last such record with that trackingId (if you were to order by eventTimestamp), and there will only be one such record.

2

2 Answers

0
votes
val completedVoyagesDF = training3.filter(training3("voyageStatus") === "COMPLETED").select("trackingID", "statusTimestamp")
val completedVoyagesArray = completedVoyagesDF.collect().map({
  row: Row => row.getString(0) -> row.getLong(1)
})
val trackingIDToActualArrivalTime = completedVoyagesArray.toMap

val arrivalTime: (String => Long) = (trackingId: String) => {
  trackingIDToActualArrivalTime.getOrElse(trackingId, -1)
}
val arrivalTimeFunc = udf(arrivalTime)
val withActualArrivalTimeDF = training3.withColumn(LABEL_COL_NAME, arrivalTimeFunc(col("trackingId")))
val training4 = withActualArrivalTimeDF.filter(withActualArrivalTimeDF(LABEL_COL_NAME) =!= -1)
0
votes

You can use collect_list over window partitions to save the status list per trackingId and a UDF to conditionally assign value tocompletionEventTimestamp as shown below:

val df = Seq(
  (504L, 10, "ACTIVE"),
  (506L, 10, "ACTIVE"),
  (510L, 10, "COMPLETE"),
  (390L, 11, "ACTIVE"),
  (395L, 11, "ACTIVE"),
  (398L, 11, "ACTIVE"),
  (352L, 12, "ACTIVE"),
  (360L, 12, "COMPLETE")
).toDF("eventTimestamp", "trackingId", "voyageStatus")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

// Save "completeTimestamp" for every row with COMPLETE status
val df2 = df.select(
  $"eventTimestamp".as("completeTimestamp"), $"trackingId"
).where(df("voyageStatus") === "COMPLETE")

// Create a "statusList" per "trackingId" for each row using collect_list over window partitions
val window = Window.partitionBy("trackingId")
val df3 = df.withColumn("statusList", collect_list("voyageStatus").over(window))

// A UDF to check whether statusList contains "COMPLETE"
val checkComplete = udf(
  (l: Seq[String]) => l.contains("COMPLETE")
)

// Join df3 with df2 and apply the UDF to assemble "completionEventTimestamp"
val df4 = df3.join(df2, Seq("trackingId"), "left_outer").
  withColumn(
    "completionEventTimestamp",
    when(checkComplete($"statusList"), $"completeTimestamp").otherwise(-1L)
  ).select(
    "eventTimestamp", "trackingId", "voyageStatus", "completionEventTimestamp"
  )

df4.show
+--------------+----------+------------+------------------------+
|eventTimestamp|trackingId|voyageStatus|completionEventTimestamp|
+--------------+----------+------------+------------------------+
|           352|        12|      ACTIVE|                     360|
|           360|        12|    COMPLETE|                     360|
|           504|        10|      ACTIVE|                     510|
|           506|        10|      ACTIVE|                     510|
|           510|        10|    COMPLETE|                     510|
|           390|        11|      ACTIVE|                      -1|
|           395|        11|      ACTIVE|                      -1|
|           398|        11|      ACTIVE|                      -1|
+--------------+----------+------------+------------------------+