1
votes

I have a Dataframe that has the structure:

(List[String], String)

An example of two row could be: ([a,b,c], d) and ([d, e], a) I want to transform these rows then to ([a,b,c], [d,e]) and ([d, e], [a, b, c])

The column names of the dataframe are "src" and "dst".

How can I approach this problem?

What I've tried:

val result = df.map(f => {
  if(df.exists(x => x._1.contains(f._2))) {
    (f._1, df.filter(x => x._1.contains(f._2)).head._1)
  } else {
    (f._1, List(f._2))
  }
}).toDF("src", "dst")

However, this solution gives me the following error:

java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2740) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

There must be a more efficient way?

1
Can you post some code that you have tried?Sid
Sure, ill edit the postRink Stiekema
what if you have more than two rows in your dataframe?Ramesh Maharjan
It should just take one eventually, but the first is okay since if there are two, those are identical (that's the way the data is structured)Rink Stiekema

1 Answers

0
votes

As far as I have understood from your question and comments above, following can be your solution

Given input dataframe as

+---------+---+
|src      |dst|
+---------+---+
|[a, b, c]|d  |
|[d, e]   |a  |
+---------+---+

You can use join and udf function as

import org.apache.spark.sql.functions._
val joinExpr = udf((col1: mutable.WrappedArray[String], col2: String) =>  col1.contains(col2))

df.as("t1").join(df.as("t2"), joinExpr($"t1.src", $"t2.dst")).select($"t1.src".as("src"), $"t2.src".as("dst")).show(false)

to get the final output as

+---------+---------+
|src      |dst      |
+---------+---------+
|[a, b, c]|[d, e]   |
|[d, e]   |[a, b, c]|
+---------+---------+

Hope the answer is helpful