2
votes

Need to perform some queries to a table inside of a UDF in structured streaming. The problem is, inside the UDF if i try to use spark.sql, i am getting null pointer exception. What is the best way to follow here.

Basically i need to stream from a table, and then use that data to perform some range queries from another table.

Eg.

val appleFilter = udf((appleId : String) => {
     val query = "select count(*) from appleMart where appleId='"+appleId+"'"
     val appleCount = spark.sql(query).collect().head.getLong(0)
     (appleCount>0)
})

val newApple = apples.filter(appleFilter($"appleId"))
1

1 Answers

1
votes

This is not really a correct approach for this task - you shouldn't do separate queries from inside UDF, as Spark won't be able to parallelize/optimize them.

The better way will be just to do a join between your streaming dataframe & appleMart dataframe - this will allow Spark to optimize all operations. As I understand from your code, you just need to check that you have apples with given ID. In this case, you can just do the inner join - this will leave only IDs for which there are rows in the appleMart, something like this:

val appleMart = spark.read.format("delta").load("path_to_delta")
val newApple = apples.join(appleMart, apples("appleId") === appleMart("appleId"))

if for some reason you need to leave apples entries that doesn't exist in the appleMart, you can use left join instead...

P.S. If appleMart doesn't change very often, you can cache it. Although, for streaming jobs, for lookup tables something like Cassandra could be better from performance standpoint.