1
votes

I am trying to extract the values from a spark streaming dataframe using scala with some code like this:

var txs = spark.readStream
  .format("kafka") .option("kafka.bootstrap.servers",KAFKABS)
  .option("subscribe", "txs")
  .load()
txs = txs.selectExpr("CAST(value AS STRING)")

val schema = StructType(Seq(
      StructField("from",StringType,true),
      StructField("to", StringType, true),  
      StructField("timestamp", TimestampType, true),
        StructField("hash", StringType, true),
      StructField("value", StringType, true)
))

txs = txs.selectExpr("cast (value as string) as json")
            .select(from_json($"json", schema).as("data"))
            .select("data.*")
            .selectExpr("from","to","cast(timestamp as timestamp) as timestamp","hash","value") 
val newDataFrame = txs
  .flatMap(row => {
    val to = row.getString(0)
    val from = row.getString(1)
   // val timestamp = row.getTimestamp??

   //do stuff
  })

I am wondering if there is an equivalent typed get method for Timestamps? To add to my confusion, it seemed there was some sort of hidden mapping (hidden to me at least) between the SQL types I am defining for my structured stream, and the actual types of the variables when I access them throught he flatMap funciton. I looked at the docs, and this was indeed the case. According to the documentation:

Returns the value at position i. If the value is null, null is returned. The following is a mapping between Spark SQL types and return types:

BooleanType -> java.lang.Boolean ByteType -> java.lang.Byte
ShortType -> java.lang.Short IntegerType -> java.lang.Integer
FloatType -> java.lang.Float DoubleType -> java.lang.Double
StringType -> String DecimalType -> java.math.BigDecimal

DateType -> java.sql.Date TimestampType -> java.sql.Timestamp

BinaryType -> byte array ArrayType -> scala.collection.Seq (use getList for java.util.List) MapType -> scala.collection.Map (use getJavaMap for java.util.Map) StructType -> org.apache.spark.sql.Row

Given that, I would have expected that this mapping would have been baked into the Row class more formally as an interface that it implements, but apparently that is not the case :( It seems that in the case of the TimestampType/java.sql.Timestamp, I have to abandon my timestamp type for something else? Someone please explain why I'm wrong! I've only been using scala and spark for 3-4 months now.

-Paul

1

1 Answers

1
votes

You have correctly deduced that the Scala type of a TimestampType column is java.sql.Timestamp.

As of V1.5.0. org.apache.spark.sql.Row has a getTimestamp(i: Int) method, so you can call it and get a java.sql.Timestamp:

val timestamp = row.getTimestamp(1)

Even if you use earlier versions, there's no need to abandon this type, you can simply use the getAs[T](i: Int) with java.sql.Timestamp:

val timestamp = row.getAs[java.sql.Timestamp](2)
// OR:
val timestamp = row.getAs[java.sql.Timestamp]("timestamp")