Objective
I'm using Power BI Desktop, DirectQuery to Spark cluster. I want to join two tables, and aggregate based on MONTH and DEP_NAME columns. Facts table is 10GB+ (contains MONTH col), while Department table is about few KBs (contains DEP_ID, DEP_NAME cols). The expected result is very small, about 100 rows.
Issue
Spark fails due to the following exception:
DataSource.Error: ODBC: ERROR [HY000] [Microsoft][Hardy] (35) Error from server: error code: '0' error message: 'Error running query: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 10 tasks (4.1 GB) is bigger than spark.driver.maxResultSize (4.0 GB)'.
I'm pretty sure Power BI tries to materialize join result (10GB+), before applying aggregation.
Question
Is there any way to make Power BI not execute/materialize join results without applying aggregation?
Power Query
let
Source = ApacheSpark.Tables("https://xxxxxxxxx.azuredatabricks.net:443/sql/protocolv1/o/yyyyyyyyyyy", 2, [BatchSize=null]),
#"Result" = Table.Group(
Table.Join(
Source{[Schema="default",Item="Fact"]}[Data],
"DEP_ID",
Table.RenameColumns(Source{[Schema="default",Item="Department"]}[Data], {"DEP_ID", "DEP_ID_"}),
"DEP_ID_",
JoinKind.Inner
),
{"MONTH", "DEP_NAME"},
{{"TOTAL_SALARY", each List.Sum([SALARY]), type number}}
)
in
#"Result"
Power Query failed job execution plan
From Spark SQL execution plan you can see that there is no aggregation step, only join! I think Power BI try to load join results (10GB+) through Spark Driver before applying GROUP BY aggregation.
Expected execution plan
I can write the same job with PySpark:
dep = spark.read.csv(dep_path)
spark.read.parquet(fact_pat) \
.join(F.broadcast(dep), ['DEP_ID']) \
.groupBy('MONTH', 'DEP_NAME') \
.agg(F.sum('SALARY')) \
.show(1000)
The plan will be the following (pay attention to hash aggregate steps at the end):
P.S.
AFAIK, Power BI Desktop "View Native Query" is disabled for Spark DirectQuery.
UPD
Looks like the issue isn't in Query Folding, Power BI for some reason materialize the table before GROUP BY even without Join. The following query leads to full table load:
let
Source = ApacheSpark.Tables("https://xxxxxxxx.azuredatabricks.net:443/sql/protocolv1/o/yyyyyyyyyyyy", 2, [BatchSize=null]),
#"Result" = Table.Group(
Source{[Schema="default",Item="Fact"]}[Data],
{"MONTH", "DEP_ID"},
{{"TOTAL_SALARY", each List.Sum([SALARY]), type number}}
)
in
#"Result"
Still, full load happends only in case of List.Sum function. List.Count and List.Max works well, even with table join before GROUP BY.