0
votes

I have two files in HDFS with the same number of lines. Lines from the files corresponds to each other by line number.

lines1=sc.textFile('1.txt')  
lines2=sc.textFile('2.txt')

My question is how to correctly zip rdd lines1 with lines2?

zipped=lines1.zip(lines2)

Zip requires the same size of RDDs and the same partitions (as I understood not only partitions count but also equal number of elements in each partition). First requirement is already satisfied. How to ensure the second one?

Thanks!

Sergey.

1

1 Answers

1
votes

In general none of the conditions will be satisfied and zip is not a good tool to perform operation like this. Both number of partitions and number of elements per partition depend not only on a number of lines but also size of the file, size of the individual files and configuration.

zip is useful when you connect RDDs which can common ancestor and are not separated by shuffle for example:

parent = sc.parallelize(range(100))
child1 = parent.map(some_func)
child2 = parent.map(other_func)
child1.zip(child2)

To merge RDDs by line you can do something like this:

def index_and_sort(rdd):
    def swap(xy):
        x, y = xy
        return y, x
    return rdd.zipWithIndex().map(swap).sortByKey()

index_and_sort(lines1).join(index_and_sort(lines)).values()

It should be safe to zip after indexing and sorting:

from pyspark import RDD

RDD.zip(*(index_and_sort(rdd).values() for rdd in [lines1, lines2]))

but why even bother?

Scala equivalent:

import org.apache.spark.rdd.RDD

def indexAndSort(rdd: RDD[String]) = rdd.zipWithIndex.map(_.swap).sortByKey()

indexAndSort(lines1).join(indexAndSort(lines2)).values