I am newbie to Spark and Scala.
I want to implement a REAL TIME Spark Consumer which could read the network logs on per minute basis [fetching around 1GB of JSON log lines/minute] from Kafka Publisher and finally store the aggregated values in ElasticSearch.
Aggregations is based on few values [like bytes_in, bytes_out etc] using composite key [like : client MAC, client IP, server MAC, Server IP etc].
Spark Consumer which I have written is:
object LogsAnalyzerScalaCS{
def main(args : Array[String]) {
val sparkConf = new SparkConf().setAppName("LOGS-AGGREGATION")
sparkConf.set("es.nodes", "my ip address")
sparkConf.set("es.port", "9200")
sparkConf.set("es.index.auto.create", "true")
sparkConf.set("es.nodes.discovery", "false")
val elasticResource = "conrec_1min/1minute"
val ssc = new StreamingContext(sparkConf, Seconds(30))
val zkQuorum = "my zk quorum IPs:2181"
val consumerGroupId = "LogsConsumer"
val topics = "Logs"
val topicMap = topics.split(",").map((_,3)).toMap
val json = KafkaUtils.createStream(ssc, zkQuorum, consumerGroupId, topicMap)
val logJSON = json.map(_._2)
try{
logJSON.foreachRDD( rdd =>{
if(!rdd.isEmpty()){
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = sqlContext.read.json(rdd)
val groupedData =
((df.groupBy("id","start_time_formated","l2_c","l3_c",
"l4_c","l2_s","l3_s","l4_s")).agg(count("f_id") as "total_f", sum("p_out") as "total_p_out",sum("p_in") as "total_p_in",sum("b_out") as "total_b_out",sum("b_in") as "total_b_in", sum("duration") as "total_duration"))
val dataForES = groupedData.withColumnRenamed("start_time_formated", "start_time")
dataForES.saveToEs(elasticResource)
dataForES.show();
}
})
}
catch{
case e: Exception => print("Exception has occurred : "+e.getMessage)
}
ssc.start()
ssc.awaitTermination()
}
object SQLContextSingleton {
@transient private var instance: org.apache.spark.sql.SQLContext = _
def getInstance(sparkContext: SparkContext): org.apache.spark.sql.SQLContext = {
if (instance == null) {
instance = new org.apache.spark.sql.SQLContext(sparkContext)
}
instance
}
}
}
First of all I would like to know if at all my approach is correct or not [considering I need 1 min logs aggregation]?
There seems to be an issue using this code:
- This Consumer will pull data from the Kafka broker every 30 seconds and saving the final aggregation to Elasticsearch for that 30 sec data, hence increasing the number of rows in Elasticsearch for unique key [at least 2 entries per one minute]. UI tool [ let's say Kibana] needs to do further aggregation. If I increase the polling time from 30 sec to 60 sec then it takes a lot of time to aggregate and hence not at all remains real time.
- I want to implement it in such a way that in ElasticSearch only one row per key should get saved. Hence I want to perform aggregation till the time I am not getting new key values in my dataset which is getting pulled from Kafka broker [per minute basis]. After doing some googling I have found that this could be achieved using groupByKey() and updateStateByKey() functions but I am not able to make out how I could use this in my case [should I convert the JSON Log lines into a string of log line with flat values and then use these functions there]? If I will use these functions then when should I save the final aggregated values into ElasticSearch?
- Is there any other way of achieving it?
Your quick help will be appreciated.
Regards, Bhupesh