I am trying to extract the values from a spark streaming dataframe using scala with some code like this:
var txs = spark.readStream
.format("kafka") .option("kafka.bootstrap.servers",KAFKABS)
.option("subscribe", "txs")
.load()
txs = txs.selectExpr("CAST(value AS STRING)")
val schema = StructType(Seq(
StructField("from",StringType,true),
StructField("to", StringType, true),
StructField("timestamp", TimestampType, true),
StructField("hash", StringType, true),
StructField("value", StringType, true)
))
txs = txs.selectExpr("cast (value as string) as json")
.select(from_json($"json", schema).as("data"))
.select("data.*")
.selectExpr("from","to","cast(timestamp as timestamp) as timestamp","hash","value")
val newDataFrame = txs
.flatMap(row => {
val to = row.getString(0)
val from = row.getString(1)
// val timestamp = row.getTimestamp??
//do stuff
})
I am wondering if there is an equivalent typed get method for Timestamps? To add to my confusion, it seemed there was some sort of hidden mapping (hidden to me at least) between the SQL types I am defining for my structured stream, and the actual types of the variables when I access them throught he flatMap
funciton. I looked at the docs, and this was indeed the case. According to the documentation:
Returns the value at position i. If the value is null, null is returned. The following is a mapping between Spark SQL types and return types:
BooleanType -> java.lang.Boolean ByteType -> java.lang.Byte
ShortType -> java.lang.Short IntegerType -> java.lang.Integer
FloatType -> java.lang.Float DoubleType -> java.lang.Double
StringType -> String DecimalType -> java.math.BigDecimalDateType -> java.sql.Date TimestampType -> java.sql.Timestamp
BinaryType -> byte array ArrayType -> scala.collection.Seq (use getList for java.util.List) MapType -> scala.collection.Map (use getJavaMap for java.util.Map) StructType -> org.apache.spark.sql.Row
Given that, I would have expected that this mapping would have been baked into the Row
class more formally as an interface that it implements, but apparently that is not the case :( It seems that in the case of the TimestampType/java.sql.Timestamp, I have to abandon my timestamp type for something else? Someone please explain why I'm wrong! I've only been using scala and spark for 3-4 months now.
-Paul