2
votes

My RDD is \n separated records that look like

Single RDD

k1=v1,k2=v2,k3=v3
k1=v1,k2=v2,k3=v3
k1=v1,k2=v2,k3=v3

and want to convert it into a Array[Map[k,v]],

where each element in Array will be a different map[k,v] corresponding to a record.

Array will contain N number of such maps depending on the records in a single RDD.

I am new to both scala and spark. Any help in the conversion will help.

object SparkApp  extends Logging with App {


  override def main(args: Array[ String ]): Unit = {
    val myConfigFile = new File("../sparkconsumer/conf/spark.conf")
    val fileConfig = ConfigFactory.parseFile(myConfigFile).getConfig(GlobalConstants.CONFIG_ROOT_ELEMENT)
    val propConf = ConfigFactory.load(fileConfig)
    val topicsSet = propConf.getString(GlobalConstants.KAFKA_WHITE_LIST_TOPIC).split(",").toSet
    val kafkaParams = Map[ String, String ]("metadata.broker.list" -> propConf.getString(GlobalConstants.KAFKA_BROKERS))


    //logger.info(message = "Hello World , You are entering Spark!!!")
    val conf = new SparkConf().setMaster("local[2]").setAppName(propConf.getString(GlobalConstants.JOB_NAME))
    conf.set("HADOOP_HOME", "/usr/local/hadoop")
    conf.set("hadoop.home.dir", "/usr/local/hadoop")
    //Lookup

    // logger.info("Window of 5 Seconds Enabled")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("/tmp/checkpoint")

    val apiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.API_FILE))
    val arrayApi = ssc.sparkContext.broadcast(apiFile.distinct().collect())

    val nonApiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.NON_API_FILE))
    val arrayNonApi = ssc.sparkContext.broadcast(nonApiFile.distinct().collect())


    val messages = KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder ](ssc, kafkaParams, topicsSet)
    writeTOHDFS2(messages)
    ssc.start()
    ssc.awaitTermination()
  }



  def writeTOHDFS2(messages: DStream[ (String, String) ]): Unit = {
    val records = messages.window(Seconds(10), Seconds(10))
    val k = records.transform( rdd => rdd.map(r =>r._2)).filter(x=> filterNullImpressions(x))

    k.foreachRDD { singleRdd =>
      if (singleRdd.count() > 0) {


        val maps =  singleRdd.map(line => line.split("\n").flatMap(x => x.split(",")).flatMap(x => x.split("=")).foreach( x => new mutable.HashMap().put(x(0),x(1))) 


        val r = scala.util.Random
        val sdf = new SimpleDateFormat("yyyy/MM/dd/HH/mm")
        maps.saveAsTextFile("hdfs://localhost:8001/user/hadoop/spark/" + sdf.format(new Date())+r.nextInt)
      }
    }

  }

}
1
what is the format inside your rdd? Is it (k1, v1), (k2,v2), etc.? - jtitusj
Welcome to StackOverflow :) Can you include what you have tried - it will make it easier for us to understand what concepts you are struggling with. - Glennie Helles Sindholt
That looks straightforward. What have you tried? How many rows in your RDD - do you really want a RDD[Map[...]]? - The Archetypal Paul

1 Answers

3
votes

Here's some code that should be pretty self-explainatory.

val lines = "k1=v1,k2=v2,k3=v3\nk1=v1,k2=v2\nk1=v1,k2=v2,k3=v3,k4=v4"

val maps = lines.split("\n")
 .map(line => line.split(",")
 .map(kvPairString => kvPairString.split("="))
 .map(kvPairArray => (kvPairArray(0), kvPairArray(1))))
 .map(_.toMap)

// maps is of type Array[Map[String, String]]

println(maps.mkString("\n"))

//  prints:
//  Map(k1 -> v1, k2 -> v2, k3 -> v3)
//  Map(k1 -> v1, k2 -> v2)
//  Map(k1 -> v1, k2 -> v2, k3 -> v3, k4 -> v4)

Word of advice - SO is not a "write code for me" platform. I understand that it's pretty hard to just dive into Scala and Spark, but next time please try to solve the problem yourself and post what you tried so far and which problems you ran into.