Please excuse the Pyspark NOOB question.
My final stage in producing a Spark dataframe in PySpark is the following:
indexer = StringIndexer(inputCol="kpID", outputCol="KPindex")
inputs = [indexer.getOutputCol()]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["KPvec"])
pipeline = Pipeline(stages=[indexer, encoder])
df_bayes = pipeline.fit(df_bayes).transform(df_bayes)
def sparse_to_array(v):
v = DenseVector(v)
new_array = list([float(x) for x in v])
return new_array
sparse_to_array_udf = F.udf(sparse_to_array, ArrayType(FloatType()))
df_bayes = df_bayes.select('id',sparse_to_array_udf(col('KPvec')).alias('KPvec'))
df_bayes = df_bayes.repartition(15000,col('id'))
df_bayes = df_bayes.select('id','KPvec').groupby('id').agg(F.array(*[F.sum(F.col('KPvec')[i]) for i in range(len(kids))]).alias("KPvec")).cache()
I am trying to aggregate-sum a sparse vector which represents a one hot encoded categorical variable.
On my EMR cluster, this takes 188s to complete. And the resulting dataframe has ~50M rows. I then try to write this dataframe to a parquet.
I have tried:
df_bayes.write.format("parquet") \
.partitionBy("id") \
.bucketBy(500,"KPvec") \
.option("path", "s3://..."+"output.parquet") \
.saveAsTable("output")
And:
df_bayes.write.repartition(1500,col('id')).parquet("s3://..."+"output.parquet")
And without re-partitioning.
In each case, the job takes a very long time and eventually fails with an ExecutorLostFailure (which is due to the EMR running with many spot instances).
In spite of caching before, I suspect that many of these steps actually don't relate to the parquet writing but rather to the computation steps I have requested.
I suspect that this is the case because if I try to calculate the dimensions of the dataframe, I see that the DAG Visualization is:
The repeating steps and the ~6GB of shuffle writes before the job fails, indicate to me that there is a big inefficiency in the way I perform the calculation.
Also, when I run explain I get the following:
== Physical Plan ==
InMemoryTableScan [id#1, KPvec#52167]
+- InMemoryRelation [id#1, KPvec#52167], StorageLevel(disk, memory, deserialized, 1 replicas)
+- HashAggregate(keys=[id#1], functions=[sum(cast(KPvec#27[0] as double)), sum(cast(KPvec#27[1] as double)), sum(cast(KPvec#27[2] as double)), sum(cast(KPvec#27[3] as double)), sum(cast(KPvec#27[4] as double)), sum(cast(KPvec#27[5] as double)), sum(cast(KPvec#27[6] as double)), sum(cast(KPvec#27[7] as double)), sum(cast(KPvec#27[8] as double)), sum(cast(KPvec#27[9] as double)), sum(cast(KPvec#27[10] as double)), sum(cast(KPvec#27[11] as double)), sum(cast(KPvec#27[12] as double)), sum(cast(KPvec#27[13] as double)), sum(cast(KPvec#27[14] as double)), sum(cast(KPvec#27[15] as double)), sum(cast(KPvec#27[16] as double)), sum(cast(KPvec#27[17] as double)), sum(cast(KPvec#27[18] as double)), sum(cast(KPvec#27[19] as double)), sum(cast(KPvec#27[20] as double)), sum(cast(KPvec#27[21] as double)), sum(cast(KPvec#27[22] as double)), sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
+- HashAggregate(keys=[id#1], functions=[partial_sum(cast(KPvec#27[0] as double)), partial_sum(cast(KPvec#27[1] as double)), partial_sum(cast(KPvec#27[2] as double)), partial_sum(cast(KPvec#27[3] as double)), partial_sum(cast(KPvec#27[4] as double)), partial_sum(cast(KPvec#27[5] as double)), partial_sum(cast(KPvec#27[6] as double)), partial_sum(cast(KPvec#27[7] as double)), partial_sum(cast(KPvec#27[8] as double)), partial_sum(cast(KPvec#27[9] as double)), partial_sum(cast(KPvec#27[10] as double)), partial_sum(cast(KPvec#27[11] as double)), partial_sum(cast(KPvec#27[12] as double)), partial_sum(cast(KPvec#27[13] as double)), partial_sum(cast(KPvec#27[14] as double)), partial_sum(cast(KPvec#27[15] as double)), partial_sum(cast(KPvec#27[16] as double)), partial_sum(cast(KPvec#27[17] as double)), partial_sum(cast(KPvec#27[18] as double)), partial_sum(cast(KPvec#27[19] as double)), partial_sum(cast(KPvec#27[20] as double)), partial_sum(cast(KPvec#27[21] as double)), partial_sum(cast(KPvec#27[22] as double)), partial_sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
+- Exchange hashpartitioning(id#1, 15000)
+- *(2) Project [id#1, pythonUDF0#52170 AS KPvec#27]
+- BatchEvalPython [sparse_to_array(KPvec#3)], [KPvec#3, id#1, pythonUDF0#52170]
+- *(1) Project [KPvec#3, id#1]
+- *(1) FileScan parquet [id#1,KPvec#3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://...bayesnetw..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,KPvec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>
Can anyone point me in the direction of what I am doing wrong here?
Thank you in advance.

