0
votes

I'm trying to do a conditional explode in Spark Structured Streaming.

For instance, my streaming dataframe looks like follows (totally making the data up here). I want to explode the employees array into separate rows of arrays when contingent = 1. When contingent = 0, I need to let the array be as is.

|----------------|---------------------|------------------|
|     Dept ID    |     Employees       |    Contingent    |
|----------------|---------------------|------------------|
|          1     | ["John", "Jane"]    |       1          |
|----------------|---------------------|------------------|
|          4     | ["Amy", "James"]    |       0          |
|----------------|---------------------|------------------|
|          2     | ["David"]           |       1          |
|----------------|---------------------|------------------|

So, my output should look like (I do not need to display the contingent column:

|----------------|---------------------|
|     Dept ID    |     Employees       |
|----------------|---------------------|
|          1     | ["John"]            |
|----------------|---------------------|
|          1     | ["Jane"]            |
|----------------|---------------------|
|          4     | ["Amy", "James"]    |
|----------------|---------------------|
|          2     | ["David"]           |
|----------------|---------------------|

There are a couple challenges I'm currently facing:

  1. Exploding Arrays conditionally
  2. exploding arrays into arrays (rather than strings in this case)

In Hive, there was a concept of UDTF (user-defined table functions) that would allow me to do this. Wondering if there is anything comparable to it?

1

1 Answers

1
votes

Use flatMap to explode and specify whatever condition you want.

case class Department (Dept_ID: String, Employees: Array[String], Contingent: Int)
case class DepartmentExp (Dept_ID: String, Employees: Array[String])

val ds = df.as[Department]

ds.flatMap(dept => {
  if (dept.Contingent == 1) {
    dept.Employees.map(emp => DepartmentExp(dept.Dept_ID, Array(emp)))
  } else {
    Array(DepartmentExp(dept.Dept_ID, dept.Employees))
  }
}).as[DepartmentExp]