2
votes

I am trying to read a big table to spark(~100M rows). The table is in PostGreSQL and we are reading as follows:

val connectionProperties = new Properties()
connectionProperties.put("user", "$USER")
connectionProperties.put("password", "$PASSWORD")

// val connection = DriverManager.getConnection("$SERVER", "$USER", "$PASSWORD")
//connection.isClosed()

val jdbc_url = s"jdbc:postgresql://${"$HOST"}:${$PORT}/${"$DB"}"
val df = spark.read.option("inferSchema", true).jdbc(jdbc_url, "$TABLE", connectionProperties)

But our SQL table has 2 columns as money datatype(in format $100,000.23). When reading in spark, it gets converted into double and throws exception.

We have tried doing : a) Casting column data to Double. But that is not helping as Spark also automatically casts to double. It is having problem with , in values. (used b) The data is already in DataFrame with commas. Trying to use PostgreSQL dialects(https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala)

Any help will be really appreciated.

1
Did you try to invoke a SELECT query, with conversion of that lame Money datatype into a more portable datatype before Spark receives the data ? E.g. (select A, B, cast(C as decimal(16,2)) as CC from TBL) x - Samson Scharfrichter
Also decimal(<digits>,<of-which-decimal-digits>) avoids rounding errors that are inevitable with floating-point types. And play hell with financial use cases... - Samson Scharfrichter

1 Answers

2
votes

You can try to specify schema manually, read the column as a string then manually parse value defining User Defined Function.

To specify schema manually you need write something like this

    val schema =
  StructType(
    StructField("your-example-column1", IntegerType, true) ::
    StructField("your-money-column", StringType, true) :: Nil)
    spark.read.schema(schema)

See Spark Scala API :

To learn more about how to convert StringType to data type you need refer to this question