1
votes

We have a spark streaming job with checkpoint enabled, it executes correctly first time, but throw below exception when restarted from checkpoint.

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) at org.apache.spark.rdd.RDD.withScope(RDD.scala:352) at org.apache.spark.rdd.RDD.union(RDD.scala:565) at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23) at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)

Please suggest any workaround for this issue. Sample app below:

String URL = "jdbc:oracle:thin:" + USERNAME + "/" + PWD + "@//" + CONNECTION_STRING;

Map<String, String> options = ImmutableMap.of(
  "driver", "oracle.jdbc.driver.OracleDriver",
  "url", URL,
  "dbtable", "READINGS_10K",
  "fetchSize", "10000");

DataFrame OracleDB_DF = sqlContext.load("jdbc", options);
JavaPairRDD<String, Row> OracleDB_RDD = OracleDB_DF.toJavaRDD()
  .mapToPair(x -> new Tuple2(x.getString(0), x));

Dstream.transformToPair(rdd -> 
  rdd.mapToPair(record -> 
    new Tuple2<>(record.getKey().toString(), record))
    .join(OracleDB_RDD)) // <-- PairRDD.join inside DStream transformation
.print();

Spark version 1.6, running in yarn cluster mode.

1

1 Answers

0
votes

Let me start with the question I'm sure you must've already been asking yourself too.

How big is the OracleDB_RDD?

If it's small enough it could act as a fact table and could be broadcast first. That in turn would make your solution not only working but also efficient.

(That's why working with Spark SQL 2.0 these days makes this and similar questions obsolete as that's the sort of optimizations of the query optimizer).

If it's large, you have to create the DataFrame inside foreach action (as described in DataFrame and SQL Operations or create your own DStream to return a RDD for a join between DStreams (see ConstantInputDStream).