5
votes

We are trying to use dynamic filter for a structured streaming application.

Let's say we have following pseudo-implementation of a Spark structured streaming application:

spark.readStream()
     .format("kafka")
     .option(...)
     ...
     .load()
     .filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
     .writeStream()
     .format("kafka")
     .option(.....)
     .start();

and getFilter returns string

String getFilter() {
   // dynamic staff to create expression
   return expression; // eg. "column = true";
}

Is it possible in current version of Spark to have a dynamic filter condition? I mean the getFilter() method should dynamically return a filter condition (let's say it's refreshed each 10min). We tried to look into broadcast variable but not sure whether structured streaming supports such a thing.

It looks like it's not possible to update job's configuration once it's submitted. As a deploy we use yarn.

Every suggestion/option is highly appreciated.


EDIT: assume getFilter() returns:

(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8

after 10 mins we can have small change (without first expression before first OR) and potentially we can have a new expression (columnA = 2) eg:

customHiveUDF(columnC, 'input') != 'required' OR columnD > 10 OR columnA = 2

The goal is to have multiple filters for one spark application and don't submit multiple jobs.

2

2 Answers

2
votes

Broadcast variable should be ok here. You can write typed filter like:

query.filter(x => x > bv.value).writeStream(...)

where bv is a Broadcast variable. You can update it as described here: How can I update a broadcast variable in spark streaming?

Other solution is to provide i.e. RCP or RESTful endpoint and ask this endpoint every 10 minutes. For example (Java, because is simpler here):

class EndpointProxy {

     Configuration lastValue;
     long lastUpdated
     public static Configuration getConfiguration (){

          if (lastUpdated + refreshRate > System.currentTimeMillis()){
               lastUpdated = System.currentTimeMillis();
               lastValue = askMyAPI();
          }
          return lastValue;
     }
}


query.filter (x => x > EndpointProxy.getConfiguration().getX()).writeStream()

Edit: hacky workaround for user's problem:

You can create 1-row view: // confsDF should be in some driver-side singleton var confsDF = Seq(some content).toDF("someColumn")

and then use:
query.crossJoin(confsDF.as("conf")) // cross join as we have only 1 value 
      .filter("hiveUDF(conf.someColumn)")
      .writeStream()...

 new Thread() {
     confsDF = Seq(some new data).toDF("someColumn)
 }.start();

This hack relies on Spark default execution model - microbatches. In each trigger the query is being rebuilt, so new data should be taken into consideration.

You can also in thread do:

Seq(some new data).toDF("someColumn).createOrReplaceTempView("conf")

and then in query:

.crossJoin(spark.table("conf"))

Both should work. Have in mind that it won't work with Continous Processing Mode

0
votes

Here is the Simple example, In which i am dynamic filtering records which is coming form socket. Instead of Date you can use any rest API which can update your filter dynamically or light weight zookeeper instance.

Note: - If you planning to use any rest API or zookeeper or any other option, use mapPartition instead of filter because in that case you have call API/Connection one time for a partition.

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].filter(_ == new java.util.Date().getMinutes.toString)

// Generate running word count
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()