4
votes

I have 27 million records in an xml file, that I want to push it into elasticsearch index Below is the code snippet written in spark scala, i'l be creating a spark job jar and going to run on AWS EMR

How can I efficiently use the spark to complete this exercise? Please guide.

I have a gzipped xml of 12.5 gb which I am loading into spark dataframe. I am new to Spark..(Should I split this gzip file? or spark executors will take care of it?)

class ReadFromXML {

  def createXMLDF(): DataFrame = {
    val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
    import spark.implicits._
    val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)

    var new_df: DataFrame = null
      
      new_df = m_df.select($"CountryCode"(0).as("countryCode"),
        $"PostalCode"(0).as("postalCode"),
        $"state"(0).as("state"),
        $"county"(0).as("county"),
        $"city"(0).as("city"),
        $"district"(0).as("district"),
        $"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
        $"FullStreetName"(0).as("street"),
        functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal"))
        .where($"LocationList.Location._primary" === "true")
        .where("(array_contains(_languageCode, 'en'))")
        .where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))
    

    new_df.drop("name")
  }
}

object PushToES extends App {
  val spark = SparkSession
    .builder()
    .appName("PushToES")
    .master("local[*]")
    .config("spark.es.nodes", "awsurl")
    .config("spark.es.port", "port")
    .config("spark.es.nodes.wan.only", "true")
    .config("spark.es.net.ssl", "true")
    .getOrCreate()

  val extractor = new ReadFromXML()

  val df = extractor.createXMLDF()
  df.saveToEs("myindex/_doc")
}

Update 1: I have splitted files in 68M each and to read this single file it takes 3.7 mins I wast trying to use snappy instead of gzip compression codec So converted the gz file into snappy file and added below in config

.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")

But it returns empty dataframe

df.printschema returns just "root"

Update 2: I have managed to run with lzo format..it takes very less time to decompress and load in dataframe.

Is it a good idea to iterate over each lzo compressed file of size 140 MB and create dataframe? or

should i load set of 10 files in a dataframe ? or

should I load all 200 lzo compressed files each of 140MB in a single dataframe?. if yes then how much memory should be allocated to master as i think this will be loaded on master?

While reading file from s3 bucket, "s3a" uri can improve performance? or "s3" uri is ok for EMR?

Update 3: To test a small set of 10 lzo files.. I used below configuration. EMR Cluster took overall 56 minutes from which step(Spark application) took 48 mins to process 10 files

1 Master - m5.xlarge 4 vCore, 16 GiB memory, EBS only storage EBS Storage:32 GiB

2 Core - m5.xlarge 4 vCore, 16 GiB memory, EBS only storage EBS Storage:32 GiB

With below Spark tuned parameters learnt from https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.nodemanager.vmem-check-enabled": "false",
      "yarn.nodemanager.pmem-check-enabled": "false"
    }
  },
  {
    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "false"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.network.timeout": "800s",
      "spark.executor.heartbeatInterval": "60s",
      "spark.dynamicAllocation.enabled": "false",
      "spark.driver.memory": "10800M",
      "spark.executor.memory": "10800M",
      "spark.executor.cores": "2",
      "spark.executor.memoryOverhead": "1200M",
      "spark.driver.memoryOverhead": "1200M",
      "spark.memory.fraction": "0.80",
      "spark.memory.storageFraction": "0.30",
      "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.yarn.scheduler.reporterThread.maxFailures": "5",
      "spark.storage.level": "MEMORY_AND_DISK_SER",
      "spark.rdd.compress": "true",
      "spark.shuffle.compress": "true",
      "spark.shuffle.spill.compress": "true",
      "spark.default.parallelism": "4"
    }
  },
  {
    "Classification": "mapred-site",
    "Properties": {
      "mapreduce.map.output.compress": "true"
    }
  }
]
2
is Spark a hard constraint in your task? 12GB of data might not be a big enough dataset to start using big data solution and maybe a simple console tool with event-based XML parser will work just fine in your case. Can you please elaborate why Spark has been chosen and if you consider solution not based on EMR/Spark?fenixil
@fenixil 12 GB is gzipped data which contains 27 million records.... Can you elaborate which other options are available? If it fits my need i can switch to it..Can I run local spark to push this much data?..i am new to big datahappy
stackoverflow.com/questions/16302385/…... Gzip files are not splittable ... you can also check data bricks API github.com/databricks/spark-xml OR you can write your custom Implementation of DataSourceV2 Reader API to read gziip xml and convert them in to spark ROW objects and register in spark.kavetiraviteja
@kavetiraviteja I am using databricks spark-xml and the above code works fine for gzipped file..My concern is ..does the above code requires any modification to work more efficiently with the spark features..as i am new to spark, so have i used it properly? or i thought splitting the xml files into smaller can improve the performance?happy
i thought splitting the xml files into smaller can improve the performance? yes it will, but not too small ... make sure splitting them to a decent size ....... what are is executor memory and executor cores configuration.kavetiraviteja

2 Answers

3
votes

Not a complete answer but still a bit long for a comment. There are a few tips I would like to suggest.

It's not clear but I assume your worry hear is the execution time. As suggested in the comments you can improve the performance by adding more nodes/executors to the cluster. If the gzip file is loaded without partitioning in spark, then you should split it to a reasonable size. (Not too small - This will make the processing slow. Not too big - executors will run OOM).

parquet is a good file format when working with Spark. If you can convert your XML to parquet. It's super compressed and lightweight.

Reading on your comments, coalesce does not do a full shuffle. The coalesce algorithm changes the number of nodes by moving data from some partitions to existing partitions. This algorithm obviously cannot increase the number of partitions. Use repartition instead. The operation is costly but it can increase the number of partitions. Check this for more facts: https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

3
votes

Here are some of the tips from my side.

Read the data in parquet format or any format. Re-partition it as per your need. Data conversion may consume time so read it in spark and then process it. Try to create map and format data before starting load. This would help easy debugging in case of complex map.

  val spark = SparkSession
    .builder()
    .appName("PushToES")
    .enableHiveSupport()
    .getOrCreate()


val batchSizeInMB=4; // change it as you need
val batchRetryCount= 3
val batchWriteRetryWait = 10
val batchEntries= 10
val enableSSL = true
val wanOnly = true
val enableIdempotentInserts = true
val esNodes = [yourNode1, yourNode2, yourNode3]
var esConfig = Map[String, String]()
esConfig = esConfig + ("es.node"-> esNodes.mkString)(","))
esConfig = esConfig + ("es.port"->port.toString())
esConfig = esConfig + ("es.batch.size.bytes"->(batchSizeInMB*1024*1024).toString())
esConfig = esConfig + ("es.batch.size.entries"->batchEntries.toString())
esConfig = esConfig + ("es.batch.write.retry.count"->batchRetryCount.toString())
esConfig = esConfig + ("es.batch.write.retry.wait"->batchWriteRetryWait.toString())
esConfig = esConfig + ("es.batch.write.refresh"->"false")
if(enableSSL){
esConfig = esConfig + ("es.net.ssl"->"true")
esConfig = esConfig + ("es.net.ssl.keystore.location"->"identity.jks")
esConfig = esConfig + ("es.net.ssl.cert.allow.self.signed"->"true")
}
if (wanOnly){
esConfig = esConfig + ("es.nodes.wan.only"->"true")
}

// This helps if some task fails , so data won't be dublicate
if(enableIdempotentInserts){
  esConfig = esConfig + ("es.mapping.id" ->"your_primary_key_column")
}

val df = "suppose you created it using parquet format or any format"

Actually data is inserted at executor level and not at driver level try giving only 2-4 core to each executor so that not so many connections are open at same time. You can vary document size or entries as per your ease. Please read about them.

write data in chunks this would help you in loading large dataset in future and try creating index map before loading data. And prefer little nested data as you have that functionality in ES I mean try to keep some primary key in your data.

val dfToInsert = df.withColumn("salt", ceil(rand())*10).cast("Int").persist()
for (i<-0 to 10){
val start = System.currentTimeMillis
val finalDF = dfToInsert.filter($"salt"===i)
val counts = finalDF.count()
println(s"count of record in chunk $i -> $counts")
finalDF.drop("salt").saveToES("indexName",esConfig)
val totalTime = System.currentTimeMillis - start
println(s"ended Loading data for chunk $i. Total time taken in Seconds : ${totalTime/1000}")
}

Try to give some alias to your final DF and update that in each run. As you would not like to disturb your production server at time of load

Memory

This can not be generic. But just to give you a kick start

keep 10-40 executor as per your data size or budget. keep each executor 8-16gb size and 5 gb overhead. (This can vary as your document can be large or small in size). If needed keep maxResultSize 8gb. Driver can have 5 cores and 30 g ram

Important Things.

  • You need to keep config in variable as you can change it as per Index

  • Insertion happens on executor not on driver, So try to keep lesser connection while writing. Each core would open one connection.

  • document insertion can be with batch entry size or document size. Change it as per your learning while doing multiple runs.

  • Try to make your solution robust. It should be able to handle all size data. Reading and writing both can be tuned but try to format your data as per document map before starting load. This would help in easy debugging, If data document is little complex and nested.

  • Memory of spark-submit can also be tuned as per your learning while running jobs. Just try to look at insertion time by varying memory and batch size.

  • Most important thing is design. If you are using ES than create your map while keeping end queries and requirement in mind.