1
votes

I am reading 1 GB of CSV file ( record count : 10 million, columns : 13 ) and trying to dump it into the SQL server. Below are the infra details :

  • CSV file location : azure blob storage

  • Code : Spark + Scala

  • Cluster : Databricks Size : enter image description here

  • Code Used to read the file and dump it :

    val df = spark.read.format(fileparser_config("fileFormat").as[String]).option("header", fileparser_config("IsFirstRowHeader").toString).load(fileparser_config("FileName").as[String]).withColumn("_ID", monotonically_increasing_id)

    val bulkCopyConfig = Config(Map( "url" -> connConfig("dataSource").as[String], "databaseName" -> connConfig("dbName").as[String], "user" -> connConfig("userName").as[String], "password" -> connConfig("password").as[String], "dbTable" -> tableName, "bulkCopyBatchSize" -> "500000", "bulkCopyTableLock" -> "true", "bulkCopyTimeout" -> "600"))

    println(s" ${LocalDateTime.now()} ************ sql bulk insert start ************")

    df.bulkCopyToSqlDB(bulkCopyConfig)

    println(s" ${LocalDateTime.now()} ************ sql bulk insert end ************")

  • Problem :

the cluster goes into a limbo and my job never completes. One time when it ran long enough it threw an error :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 38.0 failed 4 times, most recent failure: Lost task 13.3 in stage 38.0 (TID 1532, 10.0.6.6, executor 4): com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.\n\tat com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:227)\n\tat com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:796)\n\tat com.microsoft.sqlserver.jdbc.SQLServ
  • Cluster Event logs :

enter image description here

  • Other observations :

    1. While the job runs for a very long time, the cluster is not completely unresponsive. I tried this by submitting more jobs in that same window. The job ran but took comparatively more time than usual( around 10x time)
    2. I tried increasing the worker nodes and the node type ( even chose 128 GB nodes ) but still the outcome was same.
    3. While the job was running, I tried checking the SQL table row count with nolock query. I ran this after 3-4 minutes while the job was running, it gave me around 2 million records in the table. But when I ran it again after 10 minutes, the query kept running forever and never returned any records.
    4. I have tried tweaking the bulkCopyBatchSize property but it hasnt helped much.
    5. I have tried to remove the sqlinsertion code and used an aggregation operation on the dataframe that i create from 1 GB file and the entire thing takes only 40-50 seconds, so the problem is only with sql driver/sql server.
1

1 Answers

0
votes

I was facing the same issue.

Azure SQL Server - Standard S7: 800 DTUs

HDInsight - 6 node (2 D13V2 Head and 4 D13V2 Worker)

Data Size - 100GB Parquet with 1.7 billion rows.

Initially I was using "bulkCopyTimeout" as 600 seconds and I observed the loading was restarting after the timeout passed. Then I changed the timeout to a very large value and it worked fine.

For performance improvement:

Create columnstore Index in the target table and use

"bulkCopyBatchSize" = 1048576 (Load whole batch into row-groups maximum capacity and compresses them directly into the column-store rather than loading into delta store and compressing later)

"bulkCopyTableLock" = "false" (in order to allow parallelism)