I have a requirement to do the following in Spark/Cassandra environment:
- Read the latest rows in table1 which contains criteria to filter another table 2. ( I did that using data frames), so far this is working fine.
- For each filtered row from table2 I need to query table3 to check some state for that row .
- After getting the state from the previous step I need to save some data to three other tables.
The problem is that I need to query cassandra table and save to other cassandra tables inside foreach function, which means I have to pass Java Spark Context, but unfortunately, it is not serializable (see related questions here) so I am getting the famous exception:
java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
Serialization stack:
object not serializable (class: org.apache.spark.api.java.JavaSparkContext ....
I have implemented a new class which implements ForeachFunction<Row>
and made Java Spark context local variable, but I am still getting the same exception.
Now, some may say that I have to make the foreach function static, but this is not possible as I have to pass an object to it to help the logic of saving/querying cassandra tables, there must be a solution for this scenario?
But I am not sure what I am missing here.
foreach
we probably need to write that as transformations on the data. (unions, diff, joins,... on the involved dataframes) – maasg