We have a spark streaming job with checkpoint enabled, it executes correctly first time, but throw below exception when restarted from checkpoint.
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) at org.apache.spark.rdd.RDD.withScope(RDD.scala:352) at org.apache.spark.rdd.RDD.union(RDD.scala:565) at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23) at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
Please suggest any workaround for this issue. Sample app below:
String URL = "jdbc:oracle:thin:" + USERNAME + "/" + PWD + "@//" + CONNECTION_STRING;
Map<String, String> options = ImmutableMap.of(
"driver", "oracle.jdbc.driver.OracleDriver",
"url", URL,
"dbtable", "READINGS_10K",
"fetchSize", "10000");
DataFrame OracleDB_DF = sqlContext.load("jdbc", options);
JavaPairRDD<String, Row> OracleDB_RDD = OracleDB_DF.toJavaRDD()
.mapToPair(x -> new Tuple2(x.getString(0), x));
Dstream.transformToPair(rdd ->
rdd.mapToPair(record ->
new Tuple2<>(record.getKey().toString(), record))
.join(OracleDB_RDD)) // <-- PairRDD.join inside DStream transformation
.print();
Spark version 1.6, running in yarn cluster mode.