I am trying to load a large Mongo collection into Apache Spark using the Scala Mongo connector.
I am using the following versions:
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.2"
scalaVersion := "2.12.12"
openjdk version "11.0.8" 2020-07-14
The collection contains large integer decimal values greater than 1e13
. The dataset I would like to obtain is a collection with a corresponding case class called Output
, defined:
case class Output(time: Long, pubKeyId: Long, value: BigDecimal, outIndex: Long, outTxId: Long)
If I use MongoSpark.load without specifying the case class:
val ds = MongoSpark.load(sc, rc).toDS[Output]
then Mongo infers the schema by randomly sampling the collection. This results in a random scale for the value
attribute, and any documents whose value
overflows the randomly-obtained scale have a missing value
attribute in the resulting Dataset. This is obviously not desired.
Alternatively, according to the documentation for the Mongo Spark connector, I can explicitly set the schema by specifying a case class as a type-parametrisation for load
, e.g.:
val ds = MongoSpark.load[Output](sc, rc).toDS[Output]
However, in the case-class definition, I can only specify the type of value
as BigDecimal
which does not allow me to explicitly state the desired scale and precision. The resulting schema uses the default precision and scale of (38,18), which is not always desired:
root
|-- time: long (nullable = false)
|-- pubKeyId: long (nullable = false)
|-- value: decimal(38,18) (nullable = true)
|-- outIndex: long (nullable = false)
|-- outTxId: long (nullable = false)
This is in contrast to to the Spark SQL API, which allows the scale and precision to be specified explicitly using DecimalType
, e.g.:
val mySchema = StructType(StructField("value", DecimalType(30, 0)) :: Nil)
How can I request a specific scale and precision for large decimal types in the schema, similar to the code above, when loading Mongo collections into Apache Spark?