4
votes

Spark Dataframes has a method withColumn to add one new column at a time. To add multiple columns, a chain of withColumns are required. Is this the best practice to do this?

I feel that usingmapPartitions has more advantages. Let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions. It also helps if I have a database connection that I would prefer to open once per RDD partition.

My question has two parts.

The first part, this is my implementation of mapPartitions. Are there any unforeseen issues with this approach? And is there a more elegant way to do this?

df2 = df.rdd.mapPartitions(add_new_cols).toDF()

def add_new_cols(rows):
    db = open_db_connection()
    new_rows = []
    new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
    i = 0
    for each_row in rows:
        i += 1
        # conditionally omit rows
        if i % 3 == 0:
            continue
        db_result = db.get_some_result(each_row.existing_col_2)
        new_col_1 = ''.join([db_result, "_NEW"])
        new_col_2 = db_result
        new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
        new_rows.append(new_f_row)

    db.close()
    return iter(new_rows)

The second part, what are the tradeoffs in using mapPartitions over a chain of withColumn and filter?

I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation. Please let me know if my argument is wrong. Thank you! All thoughts are welcome.

2
can you share an example of a problem you are unable to do? as of now your question is too broad and sort of unclear. - mtoto
A chain of withColumns will not be executed serially if that's what you're worried about. Lazy spark will optimize the operations. - pault
Well to avoid opening the db connection twice, you could return a list and then split the output into columns. Something like df = df.withColumn('list_output', myUDF()).select("*", col('list_output')[0].alias('new_col1'), col('list_output)[1].alias('new_col2')).drop("list_output"). Converting to rdd and back to DF is slow, but I'm not an expert on this. - pault
@void take a look at this post. You could also get fancy and return a StructType() from your udf and then use list_output.* - pault

2 Answers

6
votes

Are there any unforeseen issues with this approach?

Multiple. The most severe implications are:

  • A few times higher memory footprint to compared to plain DataFrame code and significant garbage collection overhead.
  • High cost of serialization and deserialization required to move data between execution contexts.
  • Introducing breaking point in the query planner.
  • As is, cost of schema inference on toDF call (can be avoided if proper schema is provided) and possible re-execution of all preceding steps.
  • And so on...

Some of these can be avoided with udf and select / withColumn, other cannot.

let's say I have a chain of three withColumns and then one filter to remove Rows based on certain conditions. These are four different operations (I am not sure if any of these are wide transformations, though). But I can do it all in one go if I do a mapPartitions

Your mapPartitions doesn't remove any operations, and doesn't provide any optimizations, that Spark planner cannot excluding. Its only advantage is that it provides a nice scope for expensive connection objects.

I read somewhere that using the available methods with Spark DFs are always better than rolling out your own implementation

When you start using executor-side Python logic you already diverge from Spark SQL. Doesn't matter if you use udf, RDD or newly added vectorized udf. At the end of the day you should make decision based on overall structure of your code - if it is predominantly Python logic executed directly on the data it might be better to stick with RDD or skip Spark completely.

If it is just a fraction of the logic, and doesn't cause severe performance issue, don't sweat about it.

-1
votes

using df.withColumn() is the best way to add columns. they're all added lazily