0
votes

I am trying to load a large Mongo collection into Apache Spark using the Scala Mongo connector.

I am using the following versions:

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0" 
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" 
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.2"
scalaVersion := "2.12.12"
openjdk version "11.0.8" 2020-07-14

The collection contains large integer decimal values greater than 1e13. The dataset I would like to obtain is a collection with a corresponding case class called Output, defined:

case class Output(time: Long, pubKeyId: Long, value: BigDecimal, outIndex: Long, outTxId: Long)

If I use MongoSpark.load without specifying the case class:

val ds = MongoSpark.load(sc, rc).toDS[Output]

then Mongo infers the schema by randomly sampling the collection. This results in a random scale for the value attribute, and any documents whose value overflows the randomly-obtained scale have a missing value attribute in the resulting Dataset. This is obviously not desired.

Alternatively, according to the documentation for the Mongo Spark connector, I can explicitly set the schema by specifying a case class as a type-parametrisation for load , e.g.:

val ds = MongoSpark.load[Output](sc, rc).toDS[Output]

However, in the case-class definition, I can only specify the type of value as BigDecimal which does not allow me to explicitly state the desired scale and precision. The resulting schema uses the default precision and scale of (38,18), which is not always desired:

root
 |-- time: long (nullable = false)
 |-- pubKeyId: long (nullable = false)
 |-- value: decimal(38,18) (nullable = true)
 |-- outIndex: long (nullable = false)
 |-- outTxId: long (nullable = false)

This is in contrast to to the Spark SQL API, which allows the scale and precision to be specified explicitly using DecimalType, e.g.:

val mySchema = StructType(StructField("value", DecimalType(30, 0)) :: Nil)

How can I request a specific scale and precision for large decimal types in the schema, similar to the code above, when loading Mongo collections into Apache Spark?

2

2 Answers

0
votes

Per this and this, as far as I can tell, mantissa and exponent in Decimal128 are fixed size. Unless you can find evidence to the contrary it therefore does not make sense for MongoDB to permit specifying scale and precision for its decimals.

My understanding is relational databases would use different floating point types based on scale and precision (e.g. 32 bit vs 64 bit floats) but in MongoDB the database preserves the types it's given, so if you want a shorter float you'd need to make your application send it instead of the decimal type.

0
votes

I was able to do this by bypassing the load helper methods and calling toDF(schema) directly on the MongoSpark instance:

 val schema = StructType(
                             List(StructField("time", LongType, false),
                                  StructField("pubKeyId", LongType, false),
                                  StructField("value", DecimalType(30, 0), false),
                                  StructField("outIndex", LongType, false),
                                  StructField("outTxId", LongType, false)
                             ))
    val outputs =    
      builder().sparkContext(sc).readConfig(rc).build().toDF(schema).as[Output]

This results in the correct schema, and the data is read correctly into Spark without any missing values:

    outputs.printSchema()
 |-- time: long (nullable = false)
 |-- pubKeyId: long (nullable = false)
 |-- value: decimal(30,0) (nullable = false)
 |-- outIndex: long (nullable = false)
 |-- outTxId: long (nullable = false)