2
votes

I would like to have some suggestion on improving my spark application design. I am new to spark and have read documents online on performance. When I am running on the cluster with full data load it is very slow. So, please suggest how better the application can be tunned from developer perspective.

This is how I have developed my spark application: Functionality is to read the files present in HDFS, process it and store the data in parquet hive table. Developed using spark and python.

Each file size is around 50Mbyte and there are around 50 files to be processed. Its a 3 node cluster(2 slave + 1 master). Currently it takes around 4 hours to process the data. There are 10 driver cores, total executor cores 60, executor memory 7G, driver memory 7G assigned in the config.

The script reads all the files using sc.TextFile and creates an phytonRDD. python RDD is assigned a schema and is processed row by row using Lambda function. Processing row by row is taking time. after processing it stores into parquet table.

How do I find out how many RDD's are created and how much memory does each RDD take. How can this better improved.

Thank you for your suggestion.

Code Snippet:

# Input: list of files along with metadata file. 
# Files start with a number to identify which branch file and the branch number is also a value in the base table
# If there are 100 files, then I can specify to process 10 files  

# checking the metadata table
    field_lst = branch list 
    sql_stmt = 'from bd.tb select filename where field1 in ' + \
                   ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' + \
                   ' and ' + \
                   'filename like "%_yyyy_xxxxx%"'
        newdf = hc.sql(sql_stmt)
# preparing the list of files that needs to be processed. This is a subset of input files.  
        files = newdf.toPandas().values.tolist()
        newfiles = list(ny.array(files).flat)
# processing the input file
        pyrdd = sc.textFile(','.join(newfiles), use_unicode=False)
    rdd1 = pyrdd.map(lambda row: tbl_schema(*row.split('\t')))
    rdd2 = rdd1.map(lambda row: None if (row == '0' or row == '') else row)
# input file has around 20 columns which is processed in Row
    rdd3 = rdd2.map(lambda row: Row(
             str(row.field1).lower.replace("'", "''").replace("\\", "\\\\").strip(),
             row.field2,
             row.field3,
             datetime.datetime.now()
             ))
    df2 = hc.createDataFrame(rdd3, SCHEMA_rdd3)
# reading from the base table where the rows does not have the branch list in field_lst
    sql_stmt1 = 'select * from ' + base_table + ' where field1 not in ' + \
            ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")'
        df3 = hc.sql(sql_stmt1)
        new_df = df2.unionAll(df3)
        new_df.saveAsTable('tmp_tbl1, mode='overwrite')
        df_new = hc.sql('select * from tmp_tbl1')
        df_new.saveAsTable(base_table, mode='overwrite')
2
could you post some code? that might help us find where the bottleneck is.jtitusj
For the record, Spark 2.0 can load CSV files directly into DataFrames (and with Spark 1.x you can use the "Spark-CSV" plug-in with a slightly different syntax and some CLASSPATH tweaks). All Spark benchmarks show that DataFrames process faster than raw RDDs with Scala, and much faster with Python.Samson Scharfrichter

2 Answers

1
votes

I suggest you to use Spark History Server to better understand your job. It can tell you:

How do I find out how many RDD's are created and how much memory does each RDD take.

(if you cache your RDDs or do shuffle, otherwise they don't consume much memory)

Also the history server can also show you DAG of the job, potential GC issues, etc.

Processing row by row is taking time.

Since you already know this, you may want to focus on tuning the function using unit tests and profiling. Pasting the actual code in your question makes it possible for people to help.

0
votes

Thank you. Please find a code snippet below:

# Input: list of files along with metadata file. 
# Files start with a number to identify which branch file and the branch number is also a value in the base table
# If there are 100 files, then I can specify to process 10 files  

# checking the metadata table
    field_lst = branch list 
    sql_stmt = 'from bd.tb select filename where field1 in ' + \
                   ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")' + \
                   ' and ' + \
                   'filename like "%_yyyy_xxxxx%"'
        newdf = hc.sql(sql_stmt)
# preparing the list of files that needs to be processed. This is a subset of input files.  
        files = newdf.toPandas().values.tolist()
        newfiles = list(ny.array(files).flat)
# processing the input file
        pyrdd = sc.textFile(','.join(newfiles), use_unicode=False)
    rdd1 = pyrdd.map(lambda row: tbl_schema(*row.split('\t')))
    rdd2 = rdd1.map(lambda row: None if (row == '0' or row == '') else row)
# input file has around 20 columns which is processed in Row
    rdd3 = rdd2.map(lambda row: Row(
             str(row.field1).lower.replace("'", "''").replace("\\", "\\\\").strip(),
             row.field2,
             row.field3,
             datetime.datetime.now()
             ))
    df2 = hc.createDataFrame(rdd3, SCHEMA_rdd3)
# reading from the base table where the rows does not have the branch list in field_lst
    sql_stmt1 = 'select * from ' + base_table + ' where field1 not in ' + \
            ' (\"' + '\", \"'.join(["%s"] * int(field_lst.__len__())) % tuple(field_lst) + '\")'
        df3 = hc.sql(sql_stmt1)
        new_df = df2.unionAll(df3)
        new_df.saveAsTable('tmp_tbl1, mode='overwrite')
        df_new = hc.sql('select * from tmp_tbl1')
        df_new.saveAsTable(base_table, mode='overwrite')