2
votes

I'm using Spark structured streaming to process high volume data from Kafka queue and doing some heaving ML computation but I need to write the result to Elasticsearch.

I tried using the ForeachWriter but can't get a SparkContext inside it, the other option probably is to do HTTP Post inside the ForeachWriter.

Right now, am thinking of writing my own ElasticsearchSink.

Is there any documentation out there to create a Sink for Spark Structured streaming ?

2

2 Answers

1
votes

You can take a look at ForeachSink. It shows how to implement a Sink and convert DataFrame to RDD (it's very tricky and has a large comment). However, please be aware that the Sink API is still private and immature, it might be changed in future.

4
votes

If you are using Spark 2.2+ and ES 6.x then there is a ES sink out of the box:

df
  .writeStream
  .outputMode(OutputMode.Append())
  .format("org.elasticsearch.spark.sql") 
  .option("es.mapping.id", "mappingId")
  .start("index/type") // index/type

If you are using ES 5.x like I was you need to implement an EsSink and an EsSinkProvider:

EsSinkProvider:

class EsSinkProvider extends StreamSinkProvider with DataSourceRegister {

  override def createSink(sqlContext: SQLContext,
                          parameters: Map[String, String],
                          partitionColumns: Seq[String],
                          outputMode: OutputMode): Sink = {

    EsSink(sqlContext, parameters, partitionColumns, outputMode)
  }

  override def shortName(): String = "my-es-sink"
}

EsSink:

case class ElasticSearchSink(sqlContext: SQLContext,
                             options: Map[String, String],
                             partitionColumns: Seq[String],
                             outputMode: OutputMode)
  extends Sink {


  override def addBatch(batchId: Long, df: DataFrame): Unit = synchronized {
    val schema = data.schema
    // this ensures that the same query plan will be used
    val rdd: RDD[String] = df.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row]).map(_.getAs[String](0))
    }

    // from org.elasticsearch.spark.rdd library
    EsSpark.saveJsonToEs(rdd, "index/type", Map("es.mapping.id" -> "mappingId"))
  }
}

And then lastly, when writing the stream use this provider class as the format:

df
  .writeStream
  .queryName("ES-Writer")
  .outputMode(OutputMode.Append())
  .format("path.to.EsProvider")
  .start()