0
votes

I have to use load a file once every day and use this in my spark streaming. For this, I'm trying to read the file and broadcast it. Below is the code that am using.

    def loadCustomer(sc: SparkContext, customerFilePath: String) = {
      val customerList: Set[String] = if (customerFilePath.isEmpty) Set()
      else {
        sc.textFile(customerFilePath).collect().toSet
      }
      customerList
    }
    ...
    ...

    var customerList = loadCustomer(spark.sparkContext, params.customerFilePath)

    // Filter by customer regular expression and customerList
    val filteredTransactionStream = tranactionStream
                            .filter(x => IDRegex.pattern.matcher(x.customer).matches()).filter{ case(transactionRecord) => !(customerList.contains(transactionRecord.customer))}

Code works fine until the streaming job runs continuously. But I'm having the below error when I try to restart the job. I came across that we cannot use broadcast if we have checkpointing.

java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to scala.collection.SetLike

Could you please let me know how to overcome this issue.

Thanks

1

1 Answers

0
votes

You should stop the application gracefully other wise Application will get stopped with half saved data and when you try to restart it may not serialize because data is not available fully.