I would like to have some suggestion on improving my spark application design. I am new to spark and have read documents online on performance. When I am running on the cluster with full data load it is very slow. So, please suggest how better the application can be tunned from developer perspective.
This is how I have developed my spark application: Functionality is to read the files present in HDFS, process it and store the data in parquet hive table. Developed using spark and python.
Each file size is around 50Mbyte and there are around 50 files to be processed. Its a 3 node cluster(2 slave + 1 master). Currently it takes around 4 hours to process the data. There are 10 driver cores, total executor cores 60, executor memory 7G, driver memory 7G assigned in the config.
The script reads all the files using sc.TextFile and creates an phytonRDD. python RDD is assigned a schema and is processed row by row using Lambda function. Processing row by row is taking time. after processing it stores into parquet table.
How do I find out how many RDD's are created and how much memory does each RDD take. How can this better improved.
Thank you for your suggestion.
Code Snippet:
# Input: list of files along with metadata file.
# Files start with a number to identify which branch file and the branch number is also a value in the base table
# If there are 100 files, then I can specify to process 10 files
# checking the metadata table
field_lst = branch list
sql_stmt = 'from bd.tb select filename where field1 in ' + \
' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' + \
' and ' + \
'filename like "%_yyyy_xxxxx%"'
newdf = hc.sql(sql_stmt)
# preparing the list of files that needs to be processed. This is a subset of input files.
files = newdf.toPandas().values.tolist()
newfiles = list(ny.array(files).flat)
# processing the input file
pyrdd = sc.textFile(','.join(newfiles), use_unicode=False)
rdd1 = pyrdd.map(lambda row: tbl_schema(*row.split('\t')))
rdd2 = rdd1.map(lambda row: None if (row == '0' or row == '') else row)
# input file has around 20 columns which is processed in Row
rdd3 = rdd2.map(lambda row: Row(
str(row.field1).lower.replace("'", "''").replace("\\", "\\\\").strip(),
row.field2,
row.field3,
datetime.datetime.now()
))
df2 = hc.createDataFrame(rdd3, SCHEMA_rdd3)
# reading from the base table where the rows does not have the branch list in field_lst
sql_stmt1 = 'select * from ' + base_table + ' where field1 not in ' + \
' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")'
df3 = hc.sql(sql_stmt1)
new_df = df2.unionAll(df3)
new_df.saveAsTable('tmp_tbl1, mode='overwrite')
df_new = hc.sql('select * from tmp_tbl1')
df_new.saveAsTable(base_table, mode='overwrite')