1
votes

Objective

I'm using Power BI Desktop, DirectQuery to Spark cluster. I want to join two tables, and aggregate based on MONTH and DEP_NAME columns. Facts table is 10GB+ (contains MONTH col), while Department table is about few KBs (contains DEP_ID, DEP_NAME cols). The expected result is very small, about 100 rows.

Issue

Spark fails due to the following exception:

DataSource.Error: ODBC: ERROR [HY000] [Microsoft][Hardy] (35) Error from server: error code: '0' error message: 'Error running query: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 10 tasks (4.1 GB) is bigger than spark.driver.maxResultSize (4.0 GB)'.

I'm pretty sure Power BI tries to materialize join result (10GB+), before applying aggregation.

Question

Is there any way to make Power BI not execute/materialize join results without applying aggregation?

Power Query

let
    Source = ApacheSpark.Tables("https://xxxxxxxxx.azuredatabricks.net:443/sql/protocolv1/o/yyyyyyyyyyy", 2, [BatchSize=null]),
    #"Result" = Table.Group(
        Table.Join(
            Source{[Schema="default",Item="Fact"]}[Data], 
            "DEP_ID", 
            Table.RenameColumns(Source{[Schema="default",Item="Department"]}[Data], {"DEP_ID", "DEP_ID_"}), 
            "DEP_ID_", 
            JoinKind.Inner
        ), 
        {"MONTH", "DEP_NAME"}, 
        {{"TOTAL_SALARY", each List.Sum([SALARY]), type number}}
    )
in
    #"Result"

Power Query failed job execution plan

From Spark SQL execution plan you can see that there is no aggregation step, only join! I think Power BI try to load join results (10GB+) through Spark Driver before applying GROUP BY aggregation.

enter image description here

Expected execution plan

I can write the same job with PySpark:

dep = spark.read.csv(dep_path)
spark.read.parquet(fact_pat) \
    .join(F.broadcast(dep), ['DEP_ID']) \
    .groupBy('MONTH', 'DEP_NAME') \
    .agg(F.sum('SALARY')) \
    .show(1000)

The plan will be the following (pay attention to hash aggregate steps at the end):

enter image description here

P.S.

AFAIK, Power BI Desktop "View Native Query" is disabled for Spark DirectQuery.

UPD

Looks like the issue isn't in Query Folding, Power BI for some reason materialize the table before GROUP BY even without Join. The following query leads to full table load:

let
    Source = ApacheSpark.Tables("https://xxxxxxxx.azuredatabricks.net:443/sql/protocolv1/o/yyyyyyyyyyyy", 2, [BatchSize=null]),
    #"Result" = Table.Group(
        Source{[Schema="default",Item="Fact"]}[Data],
        {"MONTH", "DEP_ID"}, 
        {{"TOTAL_SALARY", each List.Sum([SALARY]), type number}}
    )
in
    #"Result"

Still, full load happends only in case of List.Sum function. List.Count and List.Max works well, even with table join before GROUP BY.

1
Given your data size, I think it'd be better to run the aggregation at the source and materialize the table as a part of the data model. Power Query is then out of the loop and your life will be much easier. The best practice I have learned over the years - prepare all data in a data warehouse, never in Power Query.RADO
@RADO for Spark I'm also using DirectQuery, so teoretically the difference shouldn't be that dramatic. But I think MPP or even RDBMs layer would perform much better (at least because Power BI query folding is better tuned for that). Moreover Spark is not created for interactive querying. But I still need to complete this POC)VB_

1 Answers

2
votes

Instead of joining then grouping, maybe you could do the reverse. Group by MONTH and DEP_ID from your Fact table and then join with the Department table to get the DEP_NAME.

Note: If multiple DEP_ID have the same DEP_NAME, you'll need to do one more group by after joining.