1
votes

I want to execute SQL query on a DB which is in Azure SQL managed instance using Azure Databricks. I have connected to DB using spark connector.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "queryCustom"  -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
  "user"         -> "username",
  "password"     -> "*********",
))

//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()

I am using above method to fetch the data(Example from MSFT doc). Table sizes are over 10M in my case. My question is How does Databricks process the query here?

Below is the documentation: The Spark master node connects to databases in SQL Database or SQL Server and loads data from a specific table or using a specific SQL query. The Spark master node distributes data to worker nodes for transformation. The Worker node connects to databases that connect to SQL Database and SQL Server and writes data to the database. User can choose to use row-by-row insertion or bulk insert.

It says master node fetches the data and distributes the work to worker nodes later. In the above code, while fetching the data what if the query itself is complex and takes time? Does it spread the work to worker nodes? or I have to fetch the tables data first to Spark and then run the SQL query to get the result. Which method do you suggest?

1
If the answer is helpful for you, you can accept it as answer( click on the check mark beside the answer to toggle it from greyed out to filled in.). This can be beneficial to other community members. Thank you.CHEEKATLAPRADEEP-MSFT

1 Answers

1
votes

So using the above method uses a single JDBC connection to pull the table into the Spark environment. And if you want to use the push down predicate on the query then you can use in this way.

val pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, 
properties=connectionProperties)
display(df)

If you want to improve the performance than you need to manage parallelism while reading. You can provide split boundaries based on the dataset’s column values.

These options specify the parallelism on read. These options must all be specified if any of them is specified. lowerBound and upperBound decide the partition stride, but do not filter the rows in table. Therefore, Spark partitions and returns all rows in the table.

The following example splits the table read across executors on the emp_no column using the columnName, lowerBound, upperBound, and numPartitions parameters.

val df = (spark.read.jdbc(url=jdbcUrl,
    table="employees",
    columnName="emp_no",
    lowerBound=1L,
    upperBound=100000L,
    numPartitions=100,
    connectionProperties=connectionProperties))
display(df)

For more Details : use this link