16
votes

I have a file in hdfs which is distributed across the nodes in the cluster.

I'm trying to get a random sample of 10 lines from this file.

in the pyspark shell, I read the file into an RDD using:

>>> textFile = sc.textFile("/user/data/myfiles/*")

and then I want to simply take a sample... the cool thing about Spark is that there are commands like takeSample, unfortunately I think I'm doing something wrong because the following takes a really long time:

>>> textFile.takeSample(False, 10, 12345)

so I tried creating a partition on each node, and then instructing each node to sample that partition using the following command:

>>> textFile.partitionBy(4).mapPartitions(lambda blockOfLines: blockOfLines.takeSample(False, 10, 1234)).first()

but this gives an error ValueError: too many values to unpack :

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/serializers.py", line 117, in dump_stream
    for obj in iterator:
  File "/opt/cloudera/parcels/CDH-5.0.2-1.cdh5.0.2.p0.13/lib/spark/python/pyspark/rdd.py", line 821, in add_shuffle_key
    for (k, v) in iterator:
ValueError: too many values to unpack

How can I sample 10 lines from a large distributed data set using spark or pyspark?

3
I don't think this is an issue with spark, see stackoverflow.com/questions/7053551/…aaronman
@aaronman you're correct in the sense that the "too many values" error is definitely a python error. I'll add more details about the error message. My hunch is that there's something wrong with my pyspark code though - are you able to run this code successfully on your spark setup?mgoldwasser
I only really use the scala spark API, I think the functional style of scala fits really well with Mapreduce in generalaaronman
@aaronman I'm open to a scala solution!mgoldwasser
@samthebest - I don't know if I'm missing something here, both python and scala are functional languages and spark has both a Python and Scala API. Is this anything more than a matter of preference?mgoldwasser

3 Answers

30
votes

Try using textFile.sample(false,fraction,seed) instead. takeSample will generally be very slow because it calls count() on the RDD. It needs to do this because otherwise it wouldn't take evenly from each partition, basically it uses the count along with the sample size you asked for to compute the fraction and calls sample internally. sample is fast because it just uses a random boolean generator that returns true fraction percent of the time and thus doesn't need to call count.

In addition, I don't think this is happening to you but if the sample size returned is not big enough it calls sample again which can obviously slow it down. Since you should have some idea of the size of your data I would recommend calling sample and then cutting the sample down to size yourself, since you know more about your data than spark does.

21
votes

Using sample instead of takeSample appears to make things reasonably fast:

textFile.sample(False, .0001, 12345)

the problem with this is that it's hard to know the right fraction to choose unless you have a rough idea of the number of rows in your data set.

0
votes

Different Types of Sample in PySpark

Randomly sample % of the data with and without replacement

import pyspark.sql.functions as F
#Randomly sample 50% of the data without replacement
sample1 = df.sample(False, 0.5, seed=0)

#Randomly sample 50% of the data with replacement
sample1 = df.sample(True, 0.5, seed=0)

#Take another sample exlcuding records from previous sample using Anti Join
sample2 = df.join(sample1, on='ID', how='left_anti').sample(False, 0.5, seed=0)

#Take another sample exlcuding records from previous sample using Where
sample1_ids = [row['ID'] for row in sample1.ID]
sample2 = df.where(~F.col('ID').isin(sample1_ids)).sample(False, 0.5, seed=0)

#Generate a startfied sample of the data across column(s)
#Sampling is probabilistic and thus cannot guarantee an exact number of rows
fractions = {
        'NJ': 0.5, #Take about 50% of records where state = NJ
    'NY': 0.25, #Take about 25% of records where state = NY
    'VA': 0.1, #Take about 10% of records where state = VA
}
stratified_sample = df.sampleBy(F.col('state'), fractions, seed=0)