50
votes

Calling collect() on an RDD will return the entire dataset to the driver which can cause out of memory and we should avoid that.

Will collect() behave the same way if called on a dataframe?
What about the select() method?
Does it also work the same way as collect() if called on a dataframe?

6

6 Answers

62
votes

Actions vs Transformations

  • Collect (Action) - Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

spark-sql doc

select(*cols) (transformation) - Projects a set of expressions and returns a new DataFrame.

Parameters: cols – list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataFrame.**

df.select('*').collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
df.select('name', 'age').collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
df.select(df.name, (df.age + 10).alias('age')).collect()
[Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]

Execution select(column-name1,column-name2,etc) method on a dataframe, returns a new dataframe which holds only the columns which were selected in the select() function.

e.g. assuming df has several columns including "name" and "value" and some others.

df2 = df.select("name","value")

df2 will hold only two columns ("name" and "value") out of the entire columns of df

df2 as the result of select will be in the executors and not in the driver (as in the case of using collect())

sql-programming-guide

df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

You can running collect() on a dataframe (spark docs)

>>> l = [('Alice', 1)]
>>> spark.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> spark.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]

spark docs

To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).

9
votes

calling select will result is lazy evaluation: for example:

val df1 = df.select("col1")
val df2 = df1.filter("col1 == 3")

both above statements create lazy path that will be executed when you call action on that df, such as show, collect etc.

val df3 = df2.collect()

use .explain at the end of your transformation to follow its plan here is more detailed info Transformations and Actions

6
votes

Select is used for projecting some or all fields of a dataframe. It won't give you an value as an output but a new dataframe. Its a transformation.

4
votes

Select is a transformation, not an action, so it is lazily evaluated (won't actually do the calculations just map the operations). Collect is an action.

Try:

df.limit(20).collect()

4
votes

To answer the questions directly:

Will collect() behave the same way if called on a dataframe?

Yes, spark.DataFrame.collect is functionally the same as spark.RDD.collect. They serve the same purpose on these different objects.

What about the select() method?

There is no such thing as spark.RDD.select, so it cannot be the same as spark.DataFrame.select.

Does it also work the same way as collect() if called on a dataframe?

The only thing that is similar between select and collect is that they are both functions on a DataFrame. They have absolutely zero overlap in functionality.

Here's my own description: collect is the opposite of sc.parallelize. select is the same as the SELECT in any SQL statement.

If you are still having trouble understanding what collect actually does (for either RDD or DataFrame), then you need to look up some articles about what spark is doing behind the scenes. e.g.:

1
votes

Short answer in bolds:

  • collect is mainly to serialize
    (loss of parallelism preserving all other data characteristics of the dataframe)
    For example with a PrintWriter pw you can't do direct df.foreach( r => pw.write(r) ), must to use collect before foreach, df.collect.foreach(etc).
    PS: the "loss of parallelism" is not a "total loss" because after serialization it can be distributed again to executors.

  • select is mainly to select columns, similar to projection in relational algebra
    (only similar in framework's context because Spark select not deduplicate data).
    So, it is also a complement of filter in the framework's context.


Commenting explanations of other answers: I like the Jeff's classification of Spark operations in transformations (as select) and actions (as collect). It is also good remember that transforms (including select) are lazily evaluated.