1
votes

I have a Dataframe that has the structure:

(List[String], String)

An example of two row could be: ([a,b,c], d) and ([d, e], a) I want to transform these rows then to ([a,b,c], [d,e]) and ([d, e], [a, b, c])

The column names of the dataframe are "src" and "dst".

How can I approach this problem?

What I've tried:

val result = df.map(f => {
  if(df.exists(x => x._1.contains(f._2))) {
    (f._1, df.filter(x => x._1.contains(f._2)).head._1)
  } else {
    (f._1, List(f._2))
  }
}).toDF("src", "dst")

However, this solution gives me the following error:

java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2740) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

There must be a more efficient way?

1
Can you post some code that you have tried? - Sid
Sure, ill edit the post - Rink Stiekema
what if you have more than two rows in your dataframe? - Ramesh Maharjan
It should just take one eventually, but the first is okay since if there are two, those are identical (that's the way the data is structured) - Rink Stiekema

1 Answers

0
votes

As far as I have understood from your question and comments above, following can be your solution

Given input dataframe as

+---------+---+
|src      |dst|
+---------+---+
|[a, b, c]|d  |
|[d, e]   |a  |
+---------+---+

You can use join and udf function as

import org.apache.spark.sql.functions._
val joinExpr = udf((col1: mutable.WrappedArray[String], col2: String) =>  col1.contains(col2))

df.as("t1").join(df.as("t2"), joinExpr($"t1.src", $"t2.dst")).select($"t1.src".as("src"), $"t2.src".as("dst")).show(false)

to get the final output as

+---------+---------+
|src      |dst      |
+---------+---------+
|[a, b, c]|[d, e]   |
|[d, e]   |[a, b, c]|
+---------+---------+

Hope the answer is helpful