0
votes

Please excuse the Pyspark NOOB question.

My final stage in producing a Spark dataframe in PySpark is the following:

indexer = StringIndexer(inputCol="kpID", outputCol="KPindex")
inputs = [indexer.getOutputCol()]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["KPvec"])
pipeline = Pipeline(stages=[indexer, encoder])
df_bayes = pipeline.fit(df_bayes).transform(df_bayes)

def sparse_to_array(v):
    v = DenseVector(v)
    new_array = list([float(x) for x in v])
    return new_array

sparse_to_array_udf = F.udf(sparse_to_array, ArrayType(FloatType()))
df_bayes = df_bayes.select('id',sparse_to_array_udf(col('KPvec')).alias('KPvec'))
df_bayes = df_bayes.repartition(15000,col('id'))
df_bayes = df_bayes.select('id','KPvec').groupby('id').agg(F.array(*[F.sum(F.col('KPvec')[i]) for i in range(len(kids))]).alias("KPvec")).cache()

I am trying to aggregate-sum a sparse vector which represents a one hot encoded categorical variable.

On my EMR cluster, this takes 188s to complete. And the resulting dataframe has ~50M rows. I then try to write this dataframe to a parquet.

I have tried:

df_bayes.write.format("parquet") \
.partitionBy("id") \
.bucketBy(500,"KPvec") \
.option("path", "s3://..."+"output.parquet") \
.saveAsTable("output")

And:

df_bayes.write.repartition(1500,col('id')).parquet("s3://..."+"output.parquet")

And without re-partitioning.

In each case, the job takes a very long time and eventually fails with an ExecutorLostFailure (which is due to the EMR running with many spot instances).

Here is the Spark DAG Visualization

In spite of caching before, I suspect that many of these steps actually don't relate to the parquet writing but rather to the computation steps I have requested.

I suspect that this is the case because if I try to calculate the dimensions of the dataframe, I see that the DAG Visualization is:

ag

The repeating steps and the ~6GB of shuffle writes before the job fails, indicate to me that there is a big inefficiency in the way I perform the calculation.

Also, when I run explain I get the following:

== Physical Plan ==
InMemoryTableScan [id#1, KPvec#52167]
   +- InMemoryRelation [id#1, KPvec#52167], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- HashAggregate(keys=[id#1], functions=[sum(cast(KPvec#27[0] as double)), sum(cast(KPvec#27[1] as double)), sum(cast(KPvec#27[2] as double)), sum(cast(KPvec#27[3] as double)), sum(cast(KPvec#27[4] as double)), sum(cast(KPvec#27[5] as double)), sum(cast(KPvec#27[6] as double)), sum(cast(KPvec#27[7] as double)), sum(cast(KPvec#27[8] as double)), sum(cast(KPvec#27[9] as double)), sum(cast(KPvec#27[10] as double)), sum(cast(KPvec#27[11] as double)), sum(cast(KPvec#27[12] as double)), sum(cast(KPvec#27[13] as double)), sum(cast(KPvec#27[14] as double)), sum(cast(KPvec#27[15] as double)), sum(cast(KPvec#27[16] as double)), sum(cast(KPvec#27[17] as double)), sum(cast(KPvec#27[18] as double)), sum(cast(KPvec#27[19] as double)), sum(cast(KPvec#27[20] as double)), sum(cast(KPvec#27[21] as double)), sum(cast(KPvec#27[22] as double)), sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
            +- HashAggregate(keys=[id#1], functions=[partial_sum(cast(KPvec#27[0] as double)), partial_sum(cast(KPvec#27[1] as double)), partial_sum(cast(KPvec#27[2] as double)), partial_sum(cast(KPvec#27[3] as double)), partial_sum(cast(KPvec#27[4] as double)), partial_sum(cast(KPvec#27[5] as double)), partial_sum(cast(KPvec#27[6] as double)), partial_sum(cast(KPvec#27[7] as double)), partial_sum(cast(KPvec#27[8] as double)), partial_sum(cast(KPvec#27[9] as double)), partial_sum(cast(KPvec#27[10] as double)), partial_sum(cast(KPvec#27[11] as double)), partial_sum(cast(KPvec#27[12] as double)), partial_sum(cast(KPvec#27[13] as double)), partial_sum(cast(KPvec#27[14] as double)), partial_sum(cast(KPvec#27[15] as double)), partial_sum(cast(KPvec#27[16] as double)), partial_sum(cast(KPvec#27[17] as double)), partial_sum(cast(KPvec#27[18] as double)), partial_sum(cast(KPvec#27[19] as double)), partial_sum(cast(KPvec#27[20] as double)), partial_sum(cast(KPvec#27[21] as double)), partial_sum(cast(KPvec#27[22] as double)), partial_sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
               +- Exchange hashpartitioning(id#1, 15000)
                  +- *(2) Project [id#1, pythonUDF0#52170 AS KPvec#27]
                     +- BatchEvalPython [sparse_to_array(KPvec#3)], [KPvec#3, id#1, pythonUDF0#52170]
                        +- *(1) Project [KPvec#3, id#1]
                           +- *(1) FileScan parquet [id#1,KPvec#3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://...bayesnetw..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,KPvec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>

Can anyone point me in the direction of what I am doing wrong here?

Thank you in advance.

1

1 Answers

0
votes

So to answer my own question in case it helps someone else, the solution is to collect_set the features you want to one-hot encode and then use CountVectorizor instead of Spark ML's OneHotEncoder.

df.select('id','feature').groupby('id').agg(F.collect_set('feature').alias('feature'))

countModel = CountVectorizer().setInputCol("feature").setOutputCol("feature_vec").fit(df)
df = countModel.transform(df).select('id','KPvec')

Then you can simply save it to a parquet. For me, this is quite fast.