0
votes

I am trying out examples to create Datasets and the below one works:

val lname = List(("Krishna", 32, "GWL"), ("Pankaj", 37, "BIHAR"), ("Sunil", 29, "Bangalre"))
import spark.implicits._
val rddLName = spark.sparkContext.parallelize(lname)
case class Test1(name: String, age: Int, place: String)
val ds1 = lname.toDS()
val ds2 = rddLName.toDS()
val ds3 = spark.createDataset(rddLName).as("Test1")
val ds4 = rddLName.toDF().as("Test1")

a) But how to use as[U](implicit : Encoder[u]) to create a Datasets: I have tried below code, and it gives me the below error. Could you guide me to some reference.

Error:(41, 62) Unable to find encoder for type Test1. An implicit Encoder[Test1] is needed to store Test1 instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._

val rddNew = lname map{case (x,y,z) => Test1(x,y,z)}
val ds5 = spark.sparkContext.parallelize(lname).toDF().as[Test]
ds5.show()

The below code is not supported. val ds5 = spark.sparkContext.parallelize(rddNew).toDF()

b) ds4.show() gives me header with _1,_2 and _3 like below:

+-------+---+--------+
|     _1| _2|      _3|
+-------+---+--------+
|Krishna| 32|     GWL|
| Pankaj| 37|   BIHAR|
|  Sunil| 29|Bangalre|
+-------+---+--------+

How to get name, age and place header with schema provided by me.

1

1 Answers

2
votes
case class Test1(name: String, age: Int, place: String)

Must be a top-level class, you can't declare it inside methods.

ds4.show() gives me header with _1,_2 and _3 like below

Your list declaration contains Tuple's not objects, there is no clue for Spark how columns should be named.

List(("Krishna", 32, "GWL"), ("Pankaj", 37, "BIHAR"), ("Sunil", 29, "Bangalre"))
  1. Provide column names
import spark.implicits._

val lname: Dataset[Test1] = List(("Krishna", 32, "GWL"), ("Pankaj", 37, "BIHAR"), ("Sunil", 29, "Bangalre"))
  .toDF("name", "age", "place")
  .as[Test1]
lname.show()

+-------+---+--------+
|   name|age|   place|
+-------+---+--------+
|Krishna| 32|     GWL|
| Pankaj| 37|   BIHAR|
|  Sunil| 29|Bangalre|
+-------+---+--------+
  1. Explicitly change the column names
val ds5 = spark.sparkContext.parallelize(lname).toDF()
      .withColumnRenamed("_1", "name")
      .withColumnRenamed("_2", "age")
      .withColumnRenamed("_3", "place").as[Test1]
    ds5.show()

+-------+---+--------+
|   name|age|   place|
+-------+---+--------+
|Krishna| 32|     GWL|
| Pankaj| 37|   BIHAR|
|  Sunil| 29|Bangalre|
+-------+---+--------+