0
votes

I am getting error while running the below SPARK code that I have written. I am trying to find sum of all the vectors based on key. Each input line starts with key(integer) then 127 floating point numbers which is a single vector having 127 dimensions ie each line starts with a key and a vector.


from cStringIO import StringIO

class testing:
    def __str__(self):
        file_str = StringIO()
        for n in self.vector:
            file_str.write(str(n)) 
            file_str.write(" ")
        return file_str.getvalue()
    def __init__(self,txt="",initial=False):
        self.vector = [0.0]*128
        if len(txt)==0:
            return
        i=0
        for n in txt.split():
            if i<128:
                self.vector[i]=float(n)
                i = i+1
                continue
            self.filename=n
            break
def addVec(self,r):
    a = testing()
    for n in xrange(0,128):
        a.vector[n] = self.vector[n] + r.vector[n]
    return a

def InitializeAndReturnPair(string,first=False):
    vec = testing(string,first)
    return 1,vec


from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

inp = sc.textFile("input.txt")
output = inp.map(lambda s: InitializeAndReturnPair(s,True)).cache()
output.saveAsTextFile("output")
print output.reduceByKey(lambda a,b : a).collect()

Example line in input.txt

6.0 156.0 26.0 3.0 1.0 0.0 2.0 1.0 15.0 113.0 53.0 139.0 156.0 0.0 0.0 0.0 156.0 29.0 1.0 38.0 59.0 0.0 0.0 0.0 28.0 4.0 2.0 9.0 1.0 0.0 0.0 0.0 9.0 83.0 13.0 1.0 0.0 9.0 42.0 7.0 41.0 71.0 74.0 123.0 35.0 17.0 7.0 2.0 156.0 27.0 6.0 33.0 11.0 2.0 0.0 11.0 35.0 4.0 2.0 4.0 1.0 3.0 2.0 4.0 0.0 0.0 0.0 0.0 2.0 19.0 45.0 17.0 47.0 2.0 2.0 7.0 59.0 90.0 15.0 11.0 156.0 14.0 1.0 4.0 9.0 11.0 2.0 29.0 35.0 6.0 5.0 9.0 4.0 2.0 1.0 3.0 1.0 0.0 0.0 0.0 1.0 5.0 25.0 14.0 27.0 2.0 0.0 2.0 86.0 48.0 10.0 6.0 156.0 23.0 1.0 2.0 21.0 6.0 0.0 3.0 31.0 10.0 4.0 3.0 0.0 0.0 1.0 2.0

Below is the error that I am getting. This error is coming from last line of the code ie output.reduceByKey

Error message - http://pastebin.com/0tqiiJQm

Not really sure how to approach this problem . I tried using a MarshalSerializer but its giving same issue .

------------------------------Answer------------------------------------

I got answer from apache user list for the same question. Basically the mapper/reducer that is run in the cluster doesnot have class definition and we have to pass the class by writing the class in a different module and attaching while configuring the SparkContext by using

sc.addPyFile(os.path( HOMEDirectory + "module.py"))

Thanks all for helping me out.

1

1 Answers

0
votes

You could use numpy arrays which work well with spark.

import numpy as np

def row_map(text):
    split_text = text.split()
    # create numpy array from elements besides the first element 
    # which is the key
    return split_text(0), np.array([float(v) for v in split_text[1:]])

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
     .setMaster("local")
     .setAppName("My app")
     .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

inp = sc.textFile("input.txt")
output = inp.map(row_map).cache()
#Below line is throwing error
print output.reduceByKey(lambda a,b : np.add(a,b)).collect()

lot more concise and pythonic.