1
votes
df1 = spark.createDataFrame([(1,[4,2]),(4,[3,2])], [ "col2","col4"])

     +----+------+
     |col2|  col4|
     +----+------+
     |  1 |[4, 2]|
     |   4|[3, 2]|
     +----+------+



   df = spark.createDataFrame([("a",1,10), ("a",2,20), ("a",3,30), 
   ("b",4,40),("b",5,40),("b",1,40)], ["col1", "col2", "col3"])

   +----+----+----+
   |col1|col2|col3|
   +----+----+----+
   |   a|   1|  10|
   |   a|   2|  20|
   |   a|   3|  30|
   |   b|   4|  40|
   |   b|   5|  40|
   |   b|   1|  40|
    +----+----+----+

join df and df1 based on col2 and if its match then check col4 isin col2 group by col1. i am expecting the output, Can someone tell me how to self join in pyspark(check col4 isin col2 group by col1).

expected output


  col1   col2   col3

   a      1     10
2

2 Answers

1
votes
      val df1 = Seq((1,List(4,2)),(4,List(3,2))).toDF("col2","col4")
  val df = Seq(("a",1,10), ("a",2,20), ("a",3,30),
    ("b",4,40),("b",5,40),("b",1,40)).toDF("col1", "col2", "col3")


  val res1DF = df1.join(df, df1.col("col2") === df.col("col2"), "inner")
    .select(
      df.col("col1"),
      df.col("col2"),
      df.col("col3")
    )

  res1DF.show(false)
  //  +----+----+----+
  //  |col1|col2|col3|
  //  +----+----+----+
  //  |a   |1   |10  |
  //  |b   |4   |40  |
  //  |b   |1   |40  |
  //  +----+----+----+

  val df11 = df1.withColumn("col41", explode(col("col4")))

  val res2DF = res1DF.join(df11, df11.col("col41") === res1DF.col("col2"), "inner")
    .select(
      res1DF.col("col1"),
      res1DF.col("col2"),
      res1DF.col("col3")
    )
  res2DF.show(false)
  //  +----+----+----+
  //  |col1|col2|col3|
  //  +----+----+----+
  //  |b   |4   |40  |
  //  +----+----+----+
1
votes

You need to use array_contains here, which returns True or False based on a matching condition

from pyspark.sql import functions as F

df = df.join(df1, "col2", "left")

df = df.withColumn("is_available", (F.expr('array_contains(col4, col2)')))
df = df.filter(F.col("is_available") == True) # In case you need only matched cases
df.show()
+----+----+----+---------+------------+
|col2|col1|col3|     col4|is_available|
+----+----+----+---------+------------+
|   1|   a|  10|[4, 2, 1]|        true|
|   1|   b|  40|[4, 2, 1]|        true|
+----+----+----+---------+------------+

------Observation in your question---------

Based on the given data - once we perform join then it will not give expected result as -

from pyspark.sql import functions as F

df = df.join(df1, "col2", "left")
df.show()
+----+----+----+------+
|col2|col1|col3|  col4|
+----+----+----+------+
|   5|   b|  40|  null|
|   1|   a|  10|[4, 2]|
|   1|   b|  40|[4, 2]|
|   3|   a|  30|  null|
|   2|   a|  20|  null|
|   4|   b|  40|[3, 2]|
+----+----+----+------+

Now, if you look value in col2 and col4, you will observe post join there is nowhere 1 available in [4, 2] . Hence while creating the I have passed 1 extra

df1 = spark.createDataFrame([(1,[4,2, **1**]),(4,[3,2])], [ "col2","col4"])