I have now re-framed my question.
I'm learning scala and spark. I'm aware of directly creating RDD from csv file instead of creating DF and converting it to RDD. But, I'm trying the below combination.
Creating scala ListBuffer, Spark Dataframe and converting it to RDD:
scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer
scala> var src_policy_final = new ListBuffer[List[Any]]
src_policy_final: scala.collection.mutable.ListBuffer[List[Any]] = ListBuffer()
scala> var src_policy_final = new ListBuffer[List[Any]]
src_policy_final: scala.collection.mutable.ListBuffer[List[Any]] = ListBuffer()
scala> var src_policy_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("sparktest/policy_details.csv")
src_policy_df: org.apache.spark.sql.DataFrame = [policy_id: int, otherdetails: string]
scala> var src_rdd = src_policy_df.rdd.map(_.toSeq.toList)
src_rdd: org.apache.spark.rdd.RDD[List[Any]] = MapPartitionsRDD[40] at map at <console>:26
scala> var src_pol_list = src_rdd.collect.toList
src_pol_list: List[List[Any]] = List(List(10110000, This is the first policy), List(10456200, This is the second policy), List(10345300, This is the third policy))
Using scala for loop I'm iterating Spark RDD records to replace column value(policy_id with surrogateId) as shown below -
scala> for(pol_details <- src_pol_list){
| src_policy_final += pol_details.toList.map(e => if(e==10110000) 1345678 else e)
| }
I'm changing the specific column values of the record using .map(e => if(e==orig_pol_id) ref_surr_id else e)
and adding the records to ListBuffer[List[Any]]
. Once I complete iterating throw all the records in RDD I would write the ListBuffer[Lis[Any]]
values as a csv file into HDFS file system using function SaveAsTextFile("/sparktest/policy_details")
When I do println of src_policy_final, the output is:
scala> println(src_policy_final)
ListBuffer(List(1345678, This is the first policy), List(10456200, This is the second policy), List(10345300, This is the third policy))
Now, I'm writing the modified data back to HDFS file system by converting ListBuffer[ListAny]] it into an RDD:
scala> var src_write = sc.parallelize(src_policy_final.toList)
src_write: org.apache.spark.rdd.RDD[List[Any]] = ParallelCollectionRDD[43] at parallelize at <console>:53
Writing to HDFS file system:
scala> src_write.saveAsTextFile("sparktest/pol_det")
The output data looks like:
List(1345678, This is the first policy)
List(10456200, This is the second policy)
List(10345300, This is the third policy)
The output that would like to get is:
1345678, This is the first policy
10456200, This is the second policy
10345300, This is the third policy
I'm not sure how to load the output as per my requirement.
Hope, I have given better explaining about what I'm trying to achieve.Could you please help?
RDD[Row]
, not a List of Scala buffers... You're seeing it print thetoString
representation of the objects because it doesn't know what to do with them. Ideally, you should be using a Dataset object, though, not RDD – OneCricketeer