I have 27 million records in an xml file, that I want to push it into elasticsearch index Below is the code snippet written in spark scala, i'l be creating a spark job jar and going to run on AWS EMR
How can I efficiently use the spark to complete this exercise? Please guide.
I have a gzipped xml of 12.5 gb which I am loading into spark dataframe. I am new to Spark..(Should I split this gzip file? or spark executors will take care of it?)
class ReadFromXML {
def createXMLDF(): DataFrame = {
val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
import spark.implicits._
val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)
var new_df: DataFrame = null
new_df = m_df.select($"CountryCode"(0).as("countryCode"),
$"PostalCode"(0).as("postalCode"),
$"state"(0).as("state"),
$"county"(0).as("county"),
$"city"(0).as("city"),
$"district"(0).as("district"),
$"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
$"FullStreetName"(0).as("street"),
functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal"))
.where($"LocationList.Location._primary" === "true")
.where("(array_contains(_languageCode, 'en'))")
.where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))
new_df.drop("name")
}
}
object PushToES extends App {
val spark = SparkSession
.builder()
.appName("PushToES")
.master("local[*]")
.config("spark.es.nodes", "awsurl")
.config("spark.es.port", "port")
.config("spark.es.nodes.wan.only", "true")
.config("spark.es.net.ssl", "true")
.getOrCreate()
val extractor = new ReadFromXML()
val df = extractor.createXMLDF()
df.saveToEs("myindex/_doc")
}
Update 1: I have splitted files in 68M each and to read this single file it takes 3.7 mins I wast trying to use snappy instead of gzip compression codec So converted the gz file into snappy file and added below in config
.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
But it returns empty dataframe
df.printschema returns just "root"
Update 2: I have managed to run with lzo format..it takes very less time to decompress and load in dataframe.
Is it a good idea to iterate over each lzo compressed file of size 140 MB and create dataframe? or
should i load set of 10 files in a dataframe ? or
should I load all 200 lzo compressed files each of 140MB in a single dataframe?. if yes then how much memory should be allocated to master as i think this will be loaded on master?
While reading file from s3 bucket, "s3a" uri can improve performance? or "s3" uri is ok for EMR?
Update 3: To test a small set of 10 lzo files.. I used below configuration. EMR Cluster took overall 56 minutes from which step(Spark application) took 48 mins to process 10 files
1 Master - m5.xlarge 4 vCore, 16 GiB memory, EBS only storage EBS Storage:32 GiB
2 Core - m5.xlarge 4 vCore, 16 GiB memory, EBS only storage EBS Storage:32 GiB
With below Spark tuned parameters learnt from https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
[
{
"Classification": "yarn-site",
"Properties": {
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "false"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.network.timeout": "800s",
"spark.executor.heartbeatInterval": "60s",
"spark.dynamicAllocation.enabled": "false",
"spark.driver.memory": "10800M",
"spark.executor.memory": "10800M",
"spark.executor.cores": "2",
"spark.executor.memoryOverhead": "1200M",
"spark.driver.memoryOverhead": "1200M",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.yarn.scheduler.reporterThread.maxFailures": "5",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.default.parallelism": "4"
}
},
{
"Classification": "mapred-site",
"Properties": {
"mapreduce.map.output.compress": "true"
}
}
]