0
votes

I have now re-framed my question.

I'm learning scala and spark. I'm aware of directly creating RDD from csv file instead of creating DF and converting it to RDD. But, I'm trying the below combination.

Creating scala ListBuffer, Spark Dataframe and converting it to RDD:

scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer

scala> var src_policy_final = new ListBuffer[List[Any]]
src_policy_final: scala.collection.mutable.ListBuffer[List[Any]] = ListBuffer()

scala> var src_policy_final = new ListBuffer[List[Any]]
src_policy_final: scala.collection.mutable.ListBuffer[List[Any]] = ListBuffer()

scala> var src_policy_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("sparktest/policy_details.csv")
src_policy_df: org.apache.spark.sql.DataFrame = [policy_id: int, otherdetails: string]

scala> var src_rdd = src_policy_df.rdd.map(_.toSeq.toList)
src_rdd: org.apache.spark.rdd.RDD[List[Any]] = MapPartitionsRDD[40] at map at <console>:26

scala> var src_pol_list = src_rdd.collect.toList
src_pol_list: List[List[Any]] = List(List(10110000, This is the first policy), List(10456200, This is the second policy), List(10345300, This is the third policy))

Using scala for loop I'm iterating Spark RDD records to replace column value(policy_id with surrogateId) as shown below -

scala> for(pol_details <- src_pol_list){
     | src_policy_final += pol_details.toList.map(e => if(e==10110000) 1345678 else e)
     | }

I'm changing the specific column values of the record using .map(e => if(e==orig_pol_id) ref_surr_id else e) and adding the records to ListBuffer[List[Any]]. Once I complete iterating throw all the records in RDD I would write the ListBuffer[Lis[Any]] values as a csv file into HDFS file system using function SaveAsTextFile("/sparktest/policy_details")

When I do println of src_policy_final, the output is:

    scala> println(src_policy_final)
ListBuffer(List(1345678, This is the first policy), List(10456200, This is the second policy), List(10345300, This is the third policy))

Now, I'm writing the modified data back to HDFS file system by converting ListBuffer[ListAny]] it into an RDD:

scala> var src_write = sc.parallelize(src_policy_final.toList)
src_write: org.apache.spark.rdd.RDD[List[Any]] = ParallelCollectionRDD[43] at parallelize at <console>:53

Writing to HDFS file system:

scala> src_write.saveAsTextFile("sparktest/pol_det")

The output data looks like:

List(1345678, This is the first policy)
List(10456200, This is the second policy)
List(10345300, This is the third policy)

The output that would like to get is:

1345678, This is the first policy
10456200, This is the second policy
10345300, This is the third policy

I'm not sure how to load the output as per my requirement.

Hope, I have given better explaining about what I'm trying to achieve.Could you please help?

1
Is my question having any issue? or Am I not clear in explaining?Vijay B
Hi @VijayB, welcome to StackOverflow, you may take a look to "how to ask" to improve this and future questions. Specially you should provide some "research effort" and/or some code, to prove you already tried to solve the problem by yourself.Luis Miguel Mejía Suárez
BTW, you may give a look to this question: stackoverflow.com/questions/39173039/…Luis Miguel Mejía Suárez
Not clear why this is tagged with Spark. If you just want to write HDFS CSV data, you can use an OutputStream and you write directly to HDFSOneCricketeer
Anyway, you need to use RDD[Row], not a List of Scala buffers... You're seeing it print the toString representation of the objects because it doesn't know what to do with them. Ideally, you should be using a Dataset object, though, not RDDOneCricketeer

1 Answers

2
votes

I really do not understand what is all you want to do...
But, since you said that you are learning I will try to explain everything step by step - hope it helps you.

First, as an advice from a colleague that switched from Java to Scala a couple of years ago. Avoid all mutations as possible, force yourself to think and program in functional way - thus, use val instead of var and immutable collections instead of mutable ones.

Second, avoid as possible using things of type Any, for example here...

var src_rdd = src_policy_df.rdd.map(_.toSeq.toList)

... you may fetch the values you want from each Row in a more typed way, for example:

val src_rdd = src_policy_df.rdd.map { row =>
   (
      row.getAs[Int](fieldName = "policy_id"),
      row.getAs[String](fieldName = "otherdetails")
   )
}
// src_rdd: RDD[(Int, String)]

Or even better, use a Dataset (a Typed DataFrame).

import spark.implicits._ // spark is an instance of SparkSession
final case class Policy(policy_id: Int, otherdetails: String)
val src_dataset = src_policy_df.as[Policy] // implicit Encoder needed here, provided by the spark implicits.

In Spark, you should never collect your data - except as the last step of your computation pipeline (and in most cases this is only done in a debug phase, because in general you save it to a external data store, like HDFS or mongo), or if you are sure you have a small RDD which you want to make accessible to other transformations as a lookup table or something similar (for example, this is very common on reduced pair RDDs, thus there exist the reduceByKeyLocally method which will return a Map).
Why? - Because collect brings all the data that was distributed on the Executors to the Driver that means, you are not longer using the framework to parallelize your computations.
What you should do is build your computation using the Transformations provided by Spark, for example map.

val orig_pol_id = 10110000
val ref_surr_id = 1345678

// Using RDDs.
val src_policy_final_rdd = src_rdd.map {
  case (id, otherdetails) if (id == orig_pol_id) => (ref_surr_id, otherdetails)
  case policy => policy // default case, nothing change.
}

// Using Datasets.
val src_policy_final_dataset = src_dataset.map {
  case policy if (policy.id == orig_pol_id) => policy.copy(id = ref_surr_id) // the copy method returns a new policy with the provided fields changed.
  case policy => policy // default case, nothing change.
}

Finally when writing the RDD to HDFS, it uses the default toString on each element to print each line. So, you may need to format it before saving.

val write_rdd = src_policy_final_rdd.map {
   case (id, otherdetails) => s"$id,$otherdetails"
}
// wirte_rdd: RDD[String]
src_write.saveAsTextFile("sparktest/pol_det")

Or, if you are using a Dataset, you can use the DataframeWriter api, to handle all that for you. (Recommended)

src_policy_final_dataset
  .write
  .option("header", "true")
  .option("sep", ",") // ',' is the default separator, but I prefer to be specific.
  .csv("sparktest/pol_det")

This should address all your question.

PS: Two final notes.
Frist, in general this question is "too board" for being asked/answered in SO - thus try to limit your scope and be more clear the next time ;).
And, you may try reading first about Spark and doing quick tutorials to get yourself more comfortable with the framework - btw, this is short spark workshop i made for the office a few days ago, it was intended for not-Scala developers, hope it helps you too :)