I am trying to read a table on postgres db using spark-jdbc. For that I have come up with the following code:
object PartitionRetrieval {
var conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
val log = LogManager.getLogger("Spark-JDBC Program")
Logger.getLogger("org").setLevel(Level.ERROR)
val conFile = "/home/myuser/ReconTest/inputdir/testconnection.properties"
val properties = new Properties()
properties.load(new FileInputStream(conFile))
val connectionUrl = properties.getProperty("gpDevUrl")
val devUserName = properties.getProperty("devUserName")
val devPassword = properties.getProperty("devPassword")
val driverClass = properties.getProperty("gpDriverClass")
val tableName = "base.ledgers"
try {
Class.forName(driverClass).newInstance()
} catch {
case cnf: ClassNotFoundException =>
log.error("Driver class: " + driverClass + " not found")
System.exit(1)
case e: Exception =>
log.error("Exception: " + e.printStackTrace())
System.exit(1)
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
import spark.implicits._
val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE" && gpTable("period_year")==="2017").count()
println("gpTable Count: " + rc)
}
}
Right now, I am fetching the count of the rows just to see if the connection is success or failed. It is a huge table and it runs slower to get the count which I understand as there are no parameters given for partition number and column name on which the data partition should happen.
In lot of places, I see the jdbc object is created in the below way:
val gpTable2 = spark.read.jdbc(connectionUrl, tableName, connectionProperties)
and I created it in another format using options.
I am unable to understand how to give the numPartitions, partition column name on which I want the data to be partitioned when the jdbc connection is formed using 'options': val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
Could anyone let me know
How do I add the parameters:
numPartitions, lowerBound, upperBoundto the jdbc object written in this way:val gpTable = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable",tableName).option("user",devUserName).option("password",devPassword).load()
How to add just
columnnameandnumPartitionSince I want to fetch all the rows that are from the year: 2017 and I don't want a range of rows to be picked (lowerBound, upperBound)