I am trying to run some tests on my local machine with spark structured streaming.
In batch mode here is the Row that i am dealing with:
val recordSchema = StructType(List(StructField("Record", MapType(StringType, StringType), false)))
val rows = List(
Row(
Map("ID" -> "1",
"STRUCTUREID" -> "MFCD00869853",
"MOLFILE" -> "The MOL Data",
"MOLWEIGHT" -> "803.482",
"FORMULA" -> "C44H69NO12",
"NAME" -> "Tacrolimus",
"HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
"SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
"METABOLISM" -> "The metabolism 500"
)),
Row(
Map("ID" -> "2",
"STRUCTUREID" -> "MFCD00869854",
"MOLFILE" -> "The MOL Data",
"MOLWEIGHT" -> "603.482",
"FORMULA" -> "",
"NAME" -> "Tacrolimus2",
"HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
"SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
"METABOLISM" -> "The metabolism 500"
))
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), recordSchema)
Working with that in Batch more works as a charm, no issue.
Now I'm try to move in streaming mode using MemoryStream for testing. I added the following:
implicit val ctx = spark.sqlContext val intsInput = MemoryStream[Row]
But the compiler complain with the as follows:
No implicits found for parameter evidence$1: Encoder[Row]
Hence, my question: What should I do here to get that working
Also i saw that if I add the following import the error goes away:
import spark.implicits._
Actually, I now get the following warning instead of an error
Ambiguous implicits for parameter evidence$1: Encoder[Row]
I do not understand the encoder mechanism well and would appreciate if someone could explain to me how not to use those implicits. The reason being that I red the following in a book when it comes to the creation of DataFrame from Rows.
Recommended appraoch:
val myManualSchema = new StructType(Array(
new StructField("some", StringType, true),
new StructField("col", StringType, true),
new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()
And then the author goes on with this:
In Scala, we can also take advantage of Spark’s implicits in the console (and if you import them in your JAR code) by running toDF on a Seq type. This does not play well with null types, so it’s not necessarily recommended for production use cases.
val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")
If someone could take the time to explain what is happening in my scenario when i use the implicit, and if it is rather safe to do so, or else is there a way to do it more explicitly without importing the implicit.
Finally, if someone could point me to a good doc around Encoder and Spark Type mapping that would be great.
EDIT1
I finally got it to work with
implicit val ctx = spark.sqlContext
import spark.implicits._
val rows = MemoryStream[Map[String,String]]
val df = rows.toDF()
Although my problem here is that i am not confident about what I am doing. It seems to me that it is like in some situation I need to create a DataSet to be able to convert it in an DF[ROW] with toDF conversion. I understood that working with DS is typeSafe but slower than with DF. So why this intermediary with DataSet? This is not the first time that i see that in Spark Structured Streaming. Again if someone could help me with those, that would be great.
Dataframe
is just a type alias forDataset[Row]
. You lose type-safety by using theRow
object since it has no constraints of the data it contains. So, should you useDataset[T]
over aDataset[Row]
? We can use this great blog-post from Databricks as a guideline: "If you want higher degree of type-safety at compile time, want typed JVM objects, take advantage of Catalyst optimization, and benefit from Tungsten’s efficient code generation, use Dataset." – Rodrigo Hernández Mota