I am trying to merge multiple RDDs of strings to a RDD of row in a specific order. I've tried to create a Map[String, RDD[Seq[String]]] (where the Seq contains only one element) and then merge them to a RDD[Row[String]], but it doesn't seems to work (content of RDD[Seq[String]] is lost).. Do someone have any ideas ?
val t1: StructType
val mapFields: Map[String, RDD[Seq[String]]]
var ordRDD: RDD[Seq[String]] = context.emptyRDD
t1.foreach(field => ordRDD = ordRDD ++ mapFiels(field.name))
val rdd = ordRDD.map(line => Row.fromSeq(line))
EDIT :
Using zip function lead to a spark exception, because my RDDs didn't have the same number of elements in each partition. I don't know how to make sure that they all have the same number of elements in each partition, so I've just zip them with index and then join them in good order using a ListMap. Maybe there is a trick to do with the mapPartitions function, but I don't know enough the Spark API yet.
val mapFields: Map[String, RDD[String]]
var ord: ListMap[String, RDD[String]] = ListMap()
t1.foreach(field => ord = ord ++ Map(field.name -> mapFields(field.name)))
// Note : zip = SparkException: Can only zip RDDs with same number of elements in each partition
//val rdd: RDD[Row] = ord.toSeq.map(_._2.map(s => Seq(s))).reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map{ case (l1, l2) => l1 ++ l2 }).map(Row.fromSeq)
val zipRdd = ord.toSeq.map(_._2.map(s => Seq(s)).zipWithIndex().map{ case (d, i) => (i, d) })
val concatRdd = zipRdd.reduceLeft((rdd1, rdd2) => rdd1.join(rdd2).map{ case (i, (l1, l2)) => (i, l1 ++ l2)})
val rowRdd: RDD[Row] = concatRdd.map{ case (i, d) => Row.fromSeq(d) }
val df1 = spark.createDataFrame(rowRdd, t1)
RDDwill become a column. RDDs are supposed to have the same size. I don't think it's necessary to take this situation into account. - belgacea