1
votes

I'm reading data in Spark from a MongoDB collection with the official MongoDB Spark Connector with the following code:

val spark = SparkSession.
            builder().
            appName("MongoDB to SQL").
            getOrCreate()

val df = MongoSpark.load(spark, readConfig)
df.count()

The readConfig is a standard read config for MongoDB, it works fine. The problem I have is with some date/time I'm getting from MongoDB as String, it fails to cast it to the Spark type TimestampValue:

 INFO DAGScheduler: Job 1 failed: count at transfer.scala:159, took 3,138191 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver):
com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a TimestampType (value: BsonString{value='2999.12.31 14:09:34'})
at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:200)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:39)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:37)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

From what I see when calling df.printSchema() before the .count() the attribute in question is listed as

|    |    |    |-- endDate: string (nullable = true)

In the MongoDB the endDate is also stored as String. Does Spark do an additional step here to detect a schema? And then it fails to cast it...? From looking at the source at https://github.com/mongodb/mongo-spark/blob/master/src/main/scala/com/mongodb/spark/sql/MapFunctions.scala#L181 it only does simple mappings there, not complicated casts.

Used versions: Mongo-Scala-Driver 2.4.0, Mongo-Spark-Connector 2.3.0, Spark 2.3.1

2
I am facing the same issue but with String to integer cast error. Even proper sampling doesn't solve the problem.prakharjain

2 Answers

0
votes

If I well understand your issue, it seems that you need to convert date as string into timestamp (including time zone) using unix_timestamp and casting it as TimestampType

If you have a df with: df with [id: int, date: string]

val res = df.select($"id", $"date", unix_timestamp($"date", "yyyy/MM/dd HH:mm:ss").cast(TimestampType).as("timestamp"), current_timestamp(), current_date())

res.printSchema

root
 |-- id: integer (nullable = false)
 |-- date: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- currenttimestamp(): timestamp (nullable = false)
 |-- currentdate(): date (nullable = false)`

You can try to see this example : https://docs.databricks.com/_static/notebooks/timestamp-conversion.html

0
votes

Mayby another field of your Schema cause this fault but not 'endDate'. No error info that you show say 'endDate' cause this error.

MongoDB Connector for Spark default use 1000 sample of your every field to build its schema, so if one field contain different datatype such as string datatype and datetime datatype, and MongoDB Connector for Spark can possiblely not sample the string data and take that filed as a datetime datatype. At least, when your use count method, connector will try to load data from mongodb to speifiy data type in spark dataframe, and cause this error: "Cannot cast STRING into a TimestampType"

The Solution:

Add the sample data of MongoDB Connector for Spark to build a correct schema. For example, in pyspark:

df = session.read.format("com.mongodb.spark.sql.DefaultSource").option('sampleSize', 50000).load()