0
votes

In Spark PushdownQuery is processed by SQL Engine of the DB and with the result from it, dataframe is constructed. so, spark querying the results of that query.

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah )"""

val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.option("numPartitions", 4)
.option("partitionColumn", "COUNTRY_CODE")
.load()

I can see from another reference(https://dzone.com/articles/how-apache-spark-makes-your-slow-mysql-queries-10x) in spark - mysql, the parallelism in pushdown query is achieved by firing multiple query based on arguments numPartitions & partitionColumn. This is very similar to approach of how sqoop distributes. say for above given example of argument's numPartitions = 4 ; partitionColumn = COUNTRY_CODE and in our table COUNTRY_CODE value range falls on (000,999).

4 queries are build; fired to DB and dataframe is constructed from the result of those (with parallelism of 4 in this case).

Q1 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE >= 000 AND COUNTRY_CODE <= 250
Q2 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 250 AND COUNTRY_CODE  <= 500
Q3 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 500 AND COUNTRY_CODE  <= 750
Q4 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 750 AND COUNTRY_CODE  <= 999

The question i have now is, how parallelism can be achieved with this method in spark (version 2.1) + hbase ( Query engine - BIGSQL) ? it's not giving me parallelism right now. Do driver's that bridge the spark-hbase need an update ? or spark needs to do that ? or what kind of change helps it getting that ? some direction with that helps me. Thank you !

1

1 Answers

0
votes

To achieve best performance, I would recommend to start your spark job with --num-executors 4 and --executor-cores 1 as the jdbc connection is single threaded one task runs on one core per query. By making this configuration change, when your job is running you can observe the tasks running in parallel that is the core in each executor are in use.

Use the below function instead:

val connectionProperties: Properties = new Properties
connectionProperties.put("user", "xxxx")
connectionProperties.put("password", "xxxx")
connectionProperties.put("fetchsize", "10000") //fetches 10000 records at once per task
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
connectionProperties

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah ) tbl_alias"""

val dbDataFrame = spark.read.jdbc(url, pushdownQuery, "COUNTRY_CODE", 0L, 4L, 4, connectionProperties)

Refer to https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,columnName:String,lowerBound:Long,upperBound:Long,numPartitions:Int,connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame