0
votes

My understanding is that if I have a dataframe if I cache() it and trigger an action like df.take(1) or df.count() it should compute the dataframe and save it in memory, And whenever that cached dataframe is called in the program it uses already computed dataframe from cache.

but that is not how my program is working.

I have a dataframe like below which I am caching it, and then immediately I run a df.count action.

  1. val df = inputDataFrame.select().where().withColumn("newcol" , "").cache()

  2. df.count

When I run the program. In Spark UI I see that first line runs for 4 min and when it comes to second line it again runs 4 min basically first line is re computed twice?

Shouldn't first line computed and cached when second line triggers?

how to resolve this behavior. I am stuck, please advise.

2

2 Answers

0
votes

My understanding is that if I have a dataframe if I cache() it and trigger an action like df.take(1) or df.count() it should compute the dataframe and save it in memory,

It is not correct. Simple cache and count (take wouldn't work on RDD either) is a valid method for RDDs but it is not the case with Datasets, which use much more advanced optimizations. With query:

df.select(...).where(...).withColumn("newcol" , "").count()

any column, which is not used in where clause can be ignored.

There is an important discussion on the developer list and quoting Sean Owen

I think the right answer is "don't do that" but if you really had to you could trigger a Dataset operation that does nothing per partition. I presume that would be more reliable because the whole partition has to be computed to make it available in practice. Or, go so far as to loop over every element.

Translated to code:

df.foreach(_ => ())

There is

df.registerAsTempTable("df")
sqlContext.sql("CACHE TABLE df")

which is eager but it is no longer (Spark 2 and forward) documented and should be avoided.

0
votes

No, if you call cache on a DataFrame it's not cached in this moment, it's only "marked" for potential future caching. The actual caching is only done when an action is followed later. You can also see your cached DataFrame in Spark UI under "Storage"

Another problem in your code is that count on DataFrame does not compute the entire DataFrame because not all columns need to be computed for that. You can use df.rdd.count() to force the entire evualation (see How to force DataFrame evaluation in Spark).

The question is why your first operation takes so long, even if no action is called. I think this is related to the caching logic (e.g. size estimations etc) being computed when calling cache (see eg. Why is rdd.map(identity).cache slow when rdd items are big?)