0
votes

I have two Spark-Scala dataframes and I need to use one boolean column from one dataframe to filter the second dataframe. Both dataframes have the same number of rows.

In pandas I would so it like this:

import pandas as pd

df1 = pd.DataFrame({"col1": ["A", "B", "A", "C"], "boolean_column": [True, False, True, False]})
df2 = pd.DataFrame({"col1": ["Z", "X", "Y", "W"], "col2": [1, 2, 3, 4]})

filtered_df2 = df2[df1['boolean_column']]

// Expected filtered_df2 should be this:
// df2 = pd.DataFrame({"col1": ["Z", "Y"], "col2": [1, 3]})

How can I do the same operation in Spark-Scala in the most time-efficient way?

My current solution is to add "boolean_column" from df1 to df2, then filter df2 by selecting only the rows with a true value in the newly added column and finally removing "boolean_column" from df2, but I'm not sure it is the best solution.

Any suggestion is appreciated.

Edit:

  • The expected output is a Spark-Scala dataframe (not a list or a column) with the same schema as the second dataframe, and only the subset of rows from df2 that satisfy the boolean mask from the "boolean_column" of df1.
  • The schema of df2 presented above is just an example. I'm expecting to receive df2 as a parameter, with any number of columns of different (and not fixed) schemas.
2
do they have a common key to join together? Otherwise you cannot combine/join your dataframe. I would suggest to filter df1 (keep only true) and then use a left-semi joinRaphael Roth
I agree, without a join condition you are not guaranteed that the rows from the two tables are going to be joined in the same orderRoberto Congiu
No, the datasets do not have any common keys, even though each row of the first dataset refers to each row of the second dataset (first row with first row, second row with second row, etc.)Molessia
if they don't have common keys how do you know that df1[row1] corresponds to df2[row1]? Is the order of the two datasets consistent every time that you load them? In general, the only way to use/apply values from one dataset to the other is through join, as @RaphaelRoth already mentioned. If the order is deterministic for both datasets, then you can use something like rowNumber to add a common id to both datasets, and finally join them.abiratsis
Exactly as you said, the order is deterministicMolessia

2 Answers

0
votes

you can zip both DataFrames and filter on those tuples.

val ints     = sparkSession.sparkContext.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val bools    = sparkSession.sparkContext.parallelize(List(true, false, true, false, true, false, true, false, true, false))
val filtered = ints.zip(bools).filter { case (int, bool) => bool }.map { case (int, bool) => int }
println(filtered.collect().toList) //List(1, 3, 5, 7, 9)
0
votes

I managed to solve this with the following code:

import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}

val spark = SparkSession.builder().appName(sc.appName).master(sc.master).getOrCreate()
val sqlContext = spark.sqlContext


def addColumnIndex(df: DataFrame, sqlContext: SQLContext) = sqlContext.createDataFrame(
  // Add Column index
  df.rdd.zipWithIndex.map{case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex)},
  // Create schema
  StructType(df.schema.fields :+ StructField("columnindex", LongType, nullable = false))
)

import spark.implicits._

val DF1 = Seq(
  ("A", true),
  ("B", false),
  ("A", true),
  ("C", false)
).toDF("col1", "boolean_column")

val DF2 = Seq(
  ("Z", 1),
  ("X", 2),
  ("Y", 3),
  ("W", 4)
).toDF("col_1", "col_2")

// Add index
val DF1WithIndex = addColumnIndex(DF1, sqlContext)
val DF2WithIndex = addColumnIndex(DF2, sqlContext)

// Join
val joinDF  = DF2WithIndex
  .join(DF1WithIndex, Seq("columnindex"))
  .drop("columnindex", "col1")

// Filter
val filteredDF2 = joinDF.filter(joinDF("boolean_column")).drop("boolean_column")

The filtered dataframe will be the following:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    Z|    1|
|    Y|    3|
+-----+-----+