1
votes

Trying to convert an org.apache.spark.sql.sources.CreatableRelationProvider into a org.apache.spark.sql.execution.streaming.Sink by simply implementing addBatch(...) which calls the createRelation(...) but there is a df.rdd in the createRelation(...), which causes the following error:

org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)

Was trying to look into howorg.apache.spark.sql.execution.streaming.FileStreamSink which also needs to get Rdd from dataframe in the streaming job, it seems to play the trick of using df.queryExecution.executedPlan.execute() to generate the RDD instead of calling .rdd.

However things does not seems to be that simple:

  1. It seems the output ordering might need to be taken care of - https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L159

  2. Might be some eager execution concerns? (not sure) https://issues.apache.org/jira/browse/SPARK-20865

More details of the issue I am running into can be found here

Wondering what would be the idiomatic way to do this conversion?

1

1 Answers

0
votes

Dataset.rdd() creates a new plan that just breaks the incremental planing. Because StreamExecution uses the existing plan to collect metrics and update watermark, we should never create a new plan. Otherwise, metrics and watermark are updated in the new plan, and StreamExecution cannot retrieval them.

Here is an example of the code in Scala to convert column values in Structured Streaming:

val convertedRows: RDD[Row] = df.queryExecution.toRdd.mapPartitions { iter: Iterator[InternalRow] =>
  iter.map { row =>
    val convertedValues: Array[Any] = new Array(conversionFunctions.length)
    var i = 0
    while (i < conversionFunctions.length) {
      convertedValues(i) = conversionFunctions(i)(row, i)
      i += 1
    }
    Row.fromSeq(convertedValues)
  }
}