I am a spark newbie and have a simple spark application using Spark SQL/hiveContext to:
- select data from hive table (1 billion rows)
- do some filtering, aggregation including row_number over window function to select first row, group by, count() and max(), etc.
- write the result into HBase (hundreds million rows)
I submit the job to run it on yarn cluster (100 executors), it's slow and when I looked at the DAG Visualization in Spark UI, it seems only the hive table scan tasks were running in parallel, rest of steps #2, and #3 above are only running in one instance which probably should be able to optimize to be parallelized?
The application looks like:
Step 1:
val input = hiveContext
.sql(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
, row_number() over
(partition by user_id, address, phone_number, first_name, last_name order by user_id, address, phone_number, first_name, last_name, server_ts desc, age) AS rn
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
table
WHERE
phone_number <> '911' AND
server_date >= '2015-12-01' and server_date < '2016-01-01' AND
user_id IS NOT NULL AND
first_name IS NOT NULL AND
last_name IS NOT NULL AND
address IS NOT NULL AND
phone_number IS NOT NULL AND
) all_rows
) all_rows_with_row_number
WHERE rn = 1)
val input_tbl = input.registerTempTable(input_tbl)
Step 2:
val result = hiveContext.sql(
SELECT state,
phone_number,
address,
COUNT(*) as hash_count,
MAX(server_ts) as latest_ts
FROM
( SELECT
udf_getState(address) as state
, user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
input_tbl ) input
WHERE state IS NOT NULL AND state != ''
GROUP BY state, phone_number, address)
Step 3:
result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)
The DAG Visualization looks like:

As you can see, the "Filter", "Project" and "Exchange" in stage 0 are only running in one instance, so does the stage1 and stage2, so a few questions and apologies if the question is dumb:
- Does "Filter", "Project" and "Exchange" happen in Driver after data shuffling from each executor?
- What code maps to "Filter", "Project" and "Exchange"?
- how I could run "Filter", "Project" and "Exchange" in parallel to optimize the performance?
- is it possible to run stage1 and stage2 in parallel?


