1
votes

I am a Spark Newbie. I have a simple pyspark script. It reads a json file, flattens it and writes it to S3 location as parquet compressed file.

The read and transformation steps run very fast and uses 50 executors (which I set in the conf). But the write stage takes a long time and writes only one large file (480MB).

How is the number of files saved decided? Can the write operation be sped up somehow?

Thanks, Ram.

2

2 Answers

2
votes

The number of files output is equal to the the number of partitions of the RDD being saved. In this sample, the RDD is repartitioned to control the number of output files.

Try:

repartition(numPartitions) - Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

>>> dataRDD.repartition(2).saveAsTextFile("/user/cloudera/sqoop_import/orders_test")

The number of files output is the same as the number of partitionds of the RDD.

$ hadoop fs -ls /user/cloudera/sqoop_import/orders_test
Found 3 items
-rw-r--r--   1 cloudera cloudera          0 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/_SUCCESS
-rw-r--r--   1 cloudera cloudera    1499519 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00000
-rw-r--r--   1 cloudera cloudera    1500425 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00001

Also check this: coalesce(numPartitions)

source-1 | source-2


Update:

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

... but this is minimum number of possible partitions so they are not guaranteed.

so if you want to partition on read, you should use this....

dataRDD=sc.textFile("/user/cloudera/sqoop_import/orders").repartition(2)
1
votes

There are 2 different things to consider:-

  1. HDFS Block size:- The block size of HDFS is configurable in HDFS-site.xml (128 Mb by default). If a file is having a size more than the block size, a new block will be assigned in the memory to the rest of the file data. But, that is not something you can see. It is done internally. The whole process is sequential.

  2. Partitions:- When Spark comes into picture, so does parallelism. Ideally, if you do not manually provide the number of partitions, it would be equal to the block size in the default configuration. On the other hand, if you want to customize the number of partitioned files, you could go ahead and use the API , where n being the number of partition. These partitions are visible to you in the HDFS when you browse it.

Also, To increase the performance, you could give some specifications such as num executors, executor memory, cores per executor, etc. while spark-submit / pyspark /spark-shell. The performance while writing any file depends on the format and compression codec used for the same wildly.

Thanks for reading.