My RDD is \n separated records that look like
Single RDD
k1=v1,k2=v2,k3=v3
k1=v1,k2=v2,k3=v3
k1=v1,k2=v2,k3=v3
and want to convert it into a Array[Map[k,v]],
where each element in Array will be a different map[k,v] corresponding to a record.
Array will contain N number of such maps depending on the records in a single RDD.
I am new to both scala and spark. Any help in the conversion will help.
object SparkApp extends Logging with App {
override def main(args: Array[ String ]): Unit = {
val myConfigFile = new File("../sparkconsumer/conf/spark.conf")
val fileConfig = ConfigFactory.parseFile(myConfigFile).getConfig(GlobalConstants.CONFIG_ROOT_ELEMENT)
val propConf = ConfigFactory.load(fileConfig)
val topicsSet = propConf.getString(GlobalConstants.KAFKA_WHITE_LIST_TOPIC).split(",").toSet
val kafkaParams = Map[ String, String ]("metadata.broker.list" -> propConf.getString(GlobalConstants.KAFKA_BROKERS))
//logger.info(message = "Hello World , You are entering Spark!!!")
val conf = new SparkConf().setMaster("local[2]").setAppName(propConf.getString(GlobalConstants.JOB_NAME))
conf.set("HADOOP_HOME", "/usr/local/hadoop")
conf.set("hadoop.home.dir", "/usr/local/hadoop")
//Lookup
// logger.info("Window of 5 Seconds Enabled")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("/tmp/checkpoint")
val apiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.API_FILE))
val arrayApi = ssc.sparkContext.broadcast(apiFile.distinct().collect())
val nonApiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.NON_API_FILE))
val arrayNonApi = ssc.sparkContext.broadcast(nonApiFile.distinct().collect())
val messages = KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder ](ssc, kafkaParams, topicsSet)
writeTOHDFS2(messages)
ssc.start()
ssc.awaitTermination()
}
def writeTOHDFS2(messages: DStream[ (String, String) ]): Unit = {
val records = messages.window(Seconds(10), Seconds(10))
val k = records.transform( rdd => rdd.map(r =>r._2)).filter(x=> filterNullImpressions(x))
k.foreachRDD { singleRdd =>
if (singleRdd.count() > 0) {
val maps = singleRdd.map(line => line.split("\n").flatMap(x => x.split(",")).flatMap(x => x.split("=")).foreach( x => new mutable.HashMap().put(x(0),x(1)))
val r = scala.util.Random
val sdf = new SimpleDateFormat("yyyy/MM/dd/HH/mm")
maps.saveAsTextFile("hdfs://localhost:8001/user/hadoop/spark/" + sdf.format(new Date())+r.nextInt)
}
}
}
}
RDD[Map[...]]? - The Archetypal Paul