9
votes

I am a spark newbie and have a simple spark application using Spark SQL/hiveContext to:

  1. select data from hive table (1 billion rows)
  2. do some filtering, aggregation including row_number over window function to select first row, group by, count() and max(), etc.
  3. write the result into HBase (hundreds million rows)

I submit the job to run it on yarn cluster (100 executors), it's slow and when I looked at the DAG Visualization in Spark UI, it seems only the hive table scan tasks were running in parallel, rest of steps #2, and #3 above are only running in one instance which probably should be able to optimize to be parallelized?

The application looks like:

Step 1:

val input = hiveContext
  .sql(
     SELECT   
           user_id  
           , address  
           , age  
           , phone_number  
           , first_name  
           , last_name  
           , server_ts   
       FROM  
       (     
           SELECT  
               user_id  
               , address  
               , age  
               , phone_number  
               , first_name  
               , last_name  
               , server_ts   
               , row_number() over 
                (partition by user_id, address,  phone_number, first_name, last_name  order by user_id, address, phone_number, first_name, last_name,  server_ts desc, age) AS rn  
           FROM  
           (  
               SELECT  
                   user_id  
                   , address  
                   , age  
                   , phone_number  
                   , first_name  
                   , last_name  
                   , server_ts  
               FROM  
                   table   
               WHERE  
                   phone_number <> '911' AND   
                   server_date >= '2015-12-01' and server_date < '2016-01-01' AND  
                   user_id IS NOT NULL AND  
                   first_name IS NOT NULL AND  
                   last_name IS NOT NULL AND  
                   address IS NOT NULL AND  
                   phone_number IS NOT NULL AND  
           ) all_rows  
       ) all_rows_with_row_number  
       WHERE rn = 1)

val input_tbl = input.registerTempTable(input_tbl)

Step 2:

val result = hiveContext.sql(
  SELECT state, 
         phone_number, 
         address, 
         COUNT(*) as hash_count, 
         MAX(server_ts) as latest_ts 
     FROM  
    ( SELECT  
         udf_getState(address) as state  
         , user_id  
         , address  
         , age  
         , phone_number  
         , first_name  
         , last_name  
         , server_ts  
     FROM  
         input_tbl ) input  
     WHERE state IS NOT NULL AND state != ''  
     GROUP BY state, phone_number, address)

Step 3:

result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)

The DAG Visualization looks like: enter image description here

As you can see, the "Filter", "Project" and "Exchange" in stage 0 are only running in one instance, so does the stage1 and stage2, so a few questions and apologies if the question is dumb:

  1. Does "Filter", "Project" and "Exchange" happen in Driver after data shuffling from each executor?
  2. What code maps to "Filter", "Project" and "Exchange"?
  3. how I could run "Filter", "Project" and "Exchange" in parallel to optimize the performance?
  4. is it possible to run stage1 and stage2 in parallel?
2
Did you check it the hbase connector allow pushdown predicates ? If so, instead of pulling all the data from HBase, you can let HBase help you at least filtering some of the data. The main bottleneck is usually I/O and networking. Some things are not clear in your code thought. What does your table represent ? Is it a DataFrame created with data from HBase ? What about your input data ? I'm afraid the description is a bit broad. Would you care reviewing your question please ? - eliasah
@eliasah, thanks for the comments. The data is pulled from Hive and stored into Hbase. Agreed that bottleneck is I/O and networking especially there are lots of shuffling - 2TB input data and 40GB shuffling write. I learned that the less shuffling the better, however, shuffling is also bound to how large the input data is? and if so, I wonder what kind of ratio (shuffling / input) would be a good ratio? - user_not_found

2 Answers

6
votes

You're not reading the DAG graph correctly - the fact that each step is visualized using a single box does not mean that it isn't using multiple tasks (and therefore cores) to calculate that step.

You can see how many tasks are used for each step by drilling-down into the stage view, that displays all tasks for this stage.

For example, here's a sample DAG visualization similar to yours:

enter image description here

You can see each stage is depicted by a "single" column of steps.

But if we look at the table below, we can see the number of tasks per stage:

enter image description here

One of them is using only 2 tasks, but the other uses 220, which means data is split into 220 partitions and partitions are processed in parallel, given enough available resources.

If you drill-down into that stage, you can see again that it used 220 tasks and details for all the tasks.

enter image description here

Only tasks reading data from disk are shown in graph as having these "multiple dots" to help you understand how many files were read.

SO - as Rashid's answer suggestes, check the number of tasks for each stage.

1
votes

It is not obvious so I would do following things to zero in the problem.

  1. Calculate execution time of each steps.
  2. First step may be slow if your table is of text format, spark usually works better if data is stored in Hive in parquet format.
  3. See if your table is partitioned by the column used in where clause.
  4. If saving data to Hbase is slow then you may need to pre-split hbase table as by default data is stored in a single region.
  5. Look at stages tab in spark ui to see how many tasks are started for each stage and also look for data local level as describe here

Hopefully, you will be able to zero in the problem.