2
votes

I have the following class, the run returns a list of ints from a database table.

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
  }
}

The following code

val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  runJob.run(d) // d expected to be int
})
processed.saveAsTextFile("c:\\temp\\mpa")

get the error of

[error] ...\src\main\scala\main.scala:39: type mismatch;
[error]  found   : org.apache.spark.sql.Row
[error]  required: Int
[error]       runJob.run(d)
[error]                  ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

I tried

  1. val processed = itemListJob.run(rc, priority).select("id").as[Int].map(d =>
  2. case class itemListRow(id: Int); ....as[itemListRow].

Both of them got errors of

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

Update: I'm trying to add the import implicits statements

  1. import sc.implicits._ got error of

    value implicits is not a member of org.apache.spark.SparkContext

  2. import sqlContext.implicits._ is OK. However, the later statement of processed.saveAsTextFile("c:\\temp\\mpa") got the error of

    value saveAsTextFile is not a member of org.apache.spark.sql.Dataset[(Int, java.time.LocalDate)]

1
What's the error when using as[Int]?zsxwing
The error is Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.ca9163d9
Did you try import spark.implicits._?zsxwing
It got the error of not found: value spark on import spark.implicits._ca9163d9
You need to create a spark session as described here: spark.apache.org/docs/latest/…Steffen Schmitz

1 Answers

3
votes

You should simply change the line with select("id") to be as follows:

select("id").as[Int]

You should import the implicits for converting Rows to Ints.

import sqlContext.implicits._ // <-- import implicits that add the "magic"

You could also change run to include the conversion as follows (note the comments to the lines I added):

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    import sqlContext.implicits._ // <-- import implicits that add the "magic"
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
    .select("id") // <-- take only "id" (which Spark pushes down and hence makes your query faster
    .as[Int] // <-- convert Row into Int
  }
}

value saveAsTextFile is not a member of org.apache.spark.sql.Dataset[(Int, java.time.LocalDate)]

The compilation error is because you try to use saveAsTextFile operation on Dataset that is not available.

Writing in Spark SQL is through DataFrameWriter that's available using write operator:

write: DataFrameWriter[T] Interface for saving the content of the non-streaming Dataset out into external storage.

So you should do the following:

processed.write.text("c:\\temp\\mpa")

Done!