1
votes

I am using EMR step functions to analyze data. I wanted to store the count of the analyzed dataframe to decide whether I can save it as a csv or parquet. I would prefer CSV but if the size is too big, I wont be able to download it and use it on my laptop. I used the count() method to store it to a int variable limit When i try using the following code:

coalesce(1).write.format("text").option("header", "false").mode("overwrite").save("output.txt")

It says that:

int doesnt have any attribute called write

Is there a way to write integers or string to a file so that I can open it in my s3 bucket and inspect after the EMR step has run?

Update: I tried the dataframe method as suggested by @Shu, but am getting the following error.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 19396, ip-10-210-13-34.ec2.internal, executor 11): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)

What could be the root cause of this?

1

1 Answers

1
votes

You can parallelize the int variable to create an rdd then write to HDFS using .saveAsTextFile

df.show()
#+---+
#| _1|
#+---+
#|  a|
#|  b|
#+---+
limit=df.count()
spark.sparkContext.parallelize([limit]).coalesce(1).saveAsTextFile("<path>")

#content of file
#cat <path>/part-00000
#2 

Other way would be creating dataframe from count variable then write in csv format as header false.

from pyspark.sql.types import *
spark.createDataFrame(spark.sparkContext.parallelize([limit]),IntegerType()).coalesce(1).write.format("csv").option("header", "false").mode("overwrite").save("<path>")

#or in text format
spark.createDataFrame(spark.sparkContext.parallelize([limit]),StringType()).coalesce(1).write.format("text").mode("overwrite").save("<path>")

#cat part-*
#2