You can collect unique state values and simply map over resulting array:
val states = df.select("State").distinct.collect.flatMap(_.toSeq)
val byStateArray = states.map(state => df.where($"State" <=> state))
or to map:
val byStateMap = states
.map(state => (state -> df.where($"State" <=> state)))
.toMap
The same thing in Python:
from itertools import chain
from pyspark.sql.functions import col
states = chain(*df.select("state").distinct().collect())
df_by_state = {state:
df.where(col("state").eqNullSafe(state)) for state in states}
The obvious problem here is that it requires a full data scan for each level, so it is an expensive operation. If you're looking for a way to just split the output see also How do I split an RDD into two or more RDDs?
In particular you can write Dataset
partitioned by the column of interest:
val path: String = ???
df.write.partitionBy("State").parquet(path)
and read back if needed:
for { state <- states } yield spark.read.parquet(path).where($"State" === state)
for { state <- states } yield spark.read.parquet(s"$path/State=$state")
Depending on the size of the data, number of levels of the splitting, storag and persistence level of the input it might faster or slower than multiple filters.