0
votes

I want to filter dataframe based on applying regex values in one of the columns to another column.

Example:
Id Column1 RegexColumm
1  Abc     A.*
2  Def     B.*
3  Ghi     G.*

The result of filtering dataframe using RegexColumm should give rows with id 1 and 3.

Is there a way to do this in spark 1.5.1? Don't want to use UDF as this might cause scalability issues, looking for spark native api.

2

2 Answers

0
votes

You can convert df -> rdd then by traversing through row we can match the regex and filter out only the matching data without using any UDF.

Example:

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

 df.show()
//+---+-------+--------+
//| id|column1|regexCol|
//+---+-------+--------+
//|  1|    Abc|     A.*|
//|  2|    Def|     B.*|
//|  3|    Ghi|     G.*|
//+---+-------+--------+

//creating new schema to add new boolean field
val sch = StructType(df.schema.fields ++ Array(StructField("bool_col", BooleanType, false)))

//convert df to rdd and match the regex using .map
val rdd = df.rdd.map(row => {
  val regex = row.getAs[String]("regexCol")
  val bool = row.getAs[String]("column1").matches(regex)
  val bool_col = s"$bool".toBoolean
  val newRow = Row.fromSeq(row.toSeq ++ Array(bool_col))
  newRow
})

//convert rdd to dataframe filter out true values for bool_col
val final_df = sqlContext.createDataFrame(rdd, sch).where(col("bool_col")).drop("bool_col")
final_df.show(10)

//+---+-------+--------+
//| id|column1|regexCol|
//+---+-------+--------+
//|  1|    Abc|     A.*|
//|  3|    Ghi|     G.*|
//+---+-------+--------+

UPDATE:

Instead of .map we can use .mapPartition (map vs mapPartiiton):

val rdd = df.rdd.mapPartitions(
    partitions => {
      partitions.map(row => {
        val regex = row.getAs[String]("regexCol")
        val bool = row.getAs[String]("column1").matches(regex)
        val bool_col = s"$bool".toBoolean
        val newRow = Row.fromSeq(row.toSeq ++ Array(bool_col))
        newRow
      })
    })
0
votes
scala> val df = Seq((1,"Abc","A.*"),(2,"Def","B.*"),(3,"Ghi","G.*")).toDF("id","Column1","RegexColumm")
df: org.apache.spark.sql.DataFrame = [id: int, Column1: string ... 1 more field]

scala> val requiredDF = df.filter(x=> x.getAs[String]("Column1").matches(x.getAs[String]("RegexColumm")))
requiredDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, Column1: string ... 1 more field]

scala> requiredDF.show
+---+-------+-----------+
| id|Column1|RegexColumm|
+---+-------+-----------+
|  1|    Abc|        A.*|
|  3|    Ghi|        G.*|
+---+-------+-----------+

You can use like above, I think this is what you are lioking for. Please do let me know if it helps you.