There is a way to use spark-redshift in structured streaming but you have to implement a few additional classes in your own fork. First of all you need a RedshiftSink that should implement org.apache.spark.sql.execution.streaming.Sink interface:
private[redshift] class RedshiftSink(
sqlContext: SQLContext,
parameters: MergedParameters,
redshiftWriter: RedshiftWriter) extends Sink {
private val log = LoggerFactory.getLogger(getClass)
@volatile private var latestBatchId = -1L
override def toString(): String = "RedshiftSink"
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
log.info(s"Skipping already committed batch $batchId")
} else {
val mode = if (parameters.overwrite) SaveMode.Overwrite else SaveMode.Append
redshiftWriter.saveToRedshift(sqlContext, data, mode, parameters)
latestBatchId = batchId
}
}
}
Then com.databricks.spark.redshift.DefaultSource should be extended with implementation of org.apache.spark.sql.sources.StreamSinkProvider:
/**
* Creates a Sink instance
*/
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new RedshiftSink(sqlContext, Parameters.mergeParameters(parameters), new RedshiftWriter(jdbcWrapper, s3ClientFactory))
}
Now you should be able to use it in structured streaming:
dataset.writeStream()
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.format("com.databricks.spark.redshift")
.outputMode(OutputMode.Append())
.queryName("redshift-stream")
.start()
Update
To fix issue with reporting metrics to StreamExecution RedshiftWriter.unloadData() has to be changed to use data.queryExecution.toRdd.mapPartitions instead of data.rdd.mapPartitions since data.rdd creates a new plan that is not visible to StreamExecution (which uses the existing plan to collect metrics). It also requires changing a conversion functions to this:
val conversionFunctions: Array[(InternalRow, Int) => Any] = data.schema.fields.map { field =>
field.dataType match {
case DateType =>
val dateFormat = Conversions.createRedshiftDateFormat()
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else dateFormat.format(
DateTimeUtils.toJavaDate(row.getInt(ordinal)))
}
case TimestampType =>
val timestampFormat = Conversions.createRedshiftTimestampFormat()
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else timestampFormat.format(
DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
}
case StringType =>
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else row.getString(ordinal)
}
case dt: DataType =>
(row: InternalRow, ordinal: Int) => {
if (row.isNullAt(ordinal)) null else row.get(ordinal, dt)
}
}
}