2
votes

I have a requirement to do the following in Spark/Cassandra environment:

  • Read the latest rows in table1 which contains criteria to filter another table 2. ( I did that using data frames), so far this is working fine.
  • For each filtered row from table2 I need to query table3 to check some state for that row .
  • After getting the state from the previous step I need to save some data to three other tables.

The problem is that I need to query cassandra table and save to other cassandra tables inside foreach function, which means I have to pass Java Spark Context, but unfortunately, it is not serializable (see related questions here) so I am getting the famous exception:

java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
Serialization stack:
object not serializable (class: org.apache.spark.api.java.JavaSparkContext ....

I have implemented a new class which implements ForeachFunction<Row> and made Java Spark context local variable, but I am still getting the same exception.

Now, some may say that I have to make the foreach function static, but this is not possible as I have to pass an object to it to help the logic of saving/querying cassandra tables, there must be a solution for this scenario?

But I am not sure what I am missing here.

1
Could you write an example of your logic? Instead of using foreach we probably need to write that as transformations on the data. (unions, diff, joins,... on the involved dataframes)maasg

1 Answers

0
votes

Using spark context on executors is not possible. But there are at least 2 solutions to your problem:

  • collect dataframe and run local foreach on driver (but this will make call to cassandra run one by one and will be probably very slow)
  • combine all operations on tables 1, 2 and 3 to a single dataframe using joins. Then then join cassandra DF with these, perform filtering and save to cassandra on executors (this will be the fastest and parallel solution but needs some extra coding)