Coming back to this question after a bit more experience on Apache Spark to complement randal's answer.
You can also use an UDF to increment a counter.
val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
val groupByCounter = spark.sparkContext.longAccumulator("group-counter")
val joinCounter = spark.sparkContext.longAccumulator("join-counter")
def countUdf(acc: LongAccumulator): UserDefinedFunction = udf { (x: Int) =>
acc.add(1)
x
}
myDataFrame
.filter(col("x") === lit(3))
.withColumn("x", countUdf(filterCounter)(col("x")))
.groupBy(col("x"))
.agg(max("y"))
.withColumn("x", countUdf(groupByCounter)(col("x")))
.join(myOtherDataframe, col("x") === col("y"))
.withColumn("x", countUdf(joinCounter)(col("x")))
.count()
print(s"count for filter = ${filterCounter.value}")
print(s"count for group by = ${groupByCounter.value}")
print(s"count for join = ${joinCounter.value}")
This should be more efficient because spark will only have to deserialize the column used in the UDF, but has to be carefully used as catalyst can more easily reorder the operations (like pushing a filter before the call to the udf)