I'm trying to index Kafka inputs into elasticsearch using Spark Streaming.
The messages in kafka are like this :
"Tom 34 happy Paris"
I would like to define in Spark Streaming the structure in order to index this message in elasticsearch :
{ Name: "Tom", Age: 34, Status: "happy", City: "Paris }
I've read about RDD transformations but couldn't find how to define the keys of the values.
I need your help.
Below My code which only do a word counting of the message received from Kafka:
package com.examples
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
object MainExample {
def main(arg: Array[String]) {
var logger = Logger.getLogger(this.getClass())
val jobName = "MainExample"
val conf = new SparkConf().setAppName(jobName)
val ssc = new StreamingContext(conf, Seconds(2))
val zkQuorum = "localhost:2181"
val group = ""
val topics = "test"
val numThreads = 1
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map( x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}