I have to use load a file once every day and use this in my spark streaming. For this, I'm trying to read the file and broadcast it. Below is the code that am using.
def loadCustomer(sc: SparkContext, customerFilePath: String) = {
val customerList: Set[String] = if (customerFilePath.isEmpty) Set()
else {
sc.textFile(customerFilePath).collect().toSet
}
customerList
}
...
...
var customerList = loadCustomer(spark.sparkContext, params.customerFilePath)
// Filter by customer regular expression and customerList
val filteredTransactionStream = tranactionStream
.filter(x => IDRegex.pattern.matcher(x.customer).matches()).filter{ case(transactionRecord) => !(customerList.contains(transactionRecord.customer))}
Code works fine until the streaming job runs continuously. But I'm having the below error when I try to restart the job. I came across that we cannot use broadcast if we have checkpointing.
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to scala.collection.SetLike
Could you please let me know how to overcome this issue.
Thanks