0
votes

Can someone help me create a multiprocessing version of the makeSubsets function? It basically calculates an index using the degree function and adds the element index (referring to the input set) to the subsets vector at the position calculated by the degree function.

My functional code, without multiprocessing.

import numpy as np
import time
from scipy.spatial import distance
import math

class Testing(object):
    
    def __init__(self, inputSet, divisor):
        self.set = inputSet
        self.divisor = divisor
        
    def __build__(self):

        self.reference = self.__reference__()
        self.num_subsets = ((len(self.set)-1) // self.divisor)
        self.subsets = [[] for i in range(self.num_subsets)]
        
        self.__makeSubsets__()
       
    def __degree__(self, xk, c1, c2):
        
        d_xk_c1 = distance.euclidean(xk, c1)
        d_xk_c2 = distance.euclidean(xk, c2)

            
        return d_xk_c1/(d_xk_c1 + d_xk_c2)
  
    def __makeSubsets__(self):
        start_time = time.time()
        for i in range(len(self.set)):
            index = int(self.__degree__(self.set[i], self.reference[0], self.reference[1]) * self.num_subsets)
            self.subsets[index].append(i)
        print("MAKE SUBSETS --- %s seconds ---" % (time.time() - total_time))
        
    def __reference__(self):
        dimension = len(self.set[0])
        size = len(self.set)

        #reference[0] = min of set iset; reference[1] = max of set iset
        reference = [[], []]

        for i in range(dimension):
            reference[0].append(math.inf)
            reference[1].append(-math.inf)

        for i in range(size):
            for j in range(dimension):
                if self.set[i][j] < reference[0][j]:
                    reference[0][j] = self.set[i][j]

                if self.set[i][j] > reference[1][j]:
                    reference[1][j] = self.set[i][j]
        return reference

if __name__ == '__main__':    
    total_time = time.time()    
    N = 10000
    Set = np.random.random((N,2))

    anObjetct = Testing(inputSet = Set.tolist(), divisor = 1000)

    anObjetct .__build__()
    print("TOTAL --- %s seconds ---" % (time.time() - total_time))

EXAMPLE INPUT

10 vectors of two positions (as points in two-dimensional space)

N = 10
Set = np.random.random((N,2))

OUTPUT

Subsets containing input set slices. The values in the subsets correspond to the indexes of the elements in the input vector.

>>> objeto.subsets
[[0, 2, 9], [5, 7], [1, 3, 6, 8], [4]]
Execution time: 0.009 seconds

-----

I tried to create a version with multiprocessing, but it does not present the correct result, nor does it produce any results. Furthermore, the processing time is totally inconsistent.

import multiprocessing as mp
import numpy as np
import time
from scipy.spatial import distance
import math

class Testing(object):
    
    def __init__(self, inputSet, divisor):
        self.set = inputSet
        self.divisor = divisor
        
    def __build__(self):

        self.reference = self.__reference__()
        self.num_subsets = ((len(self.set)-1) // self.divisor)
        self.subsets = [[] for i in range(self.num_subsets)]

        with mp.Pool() as pool:
            pool.map(self.__makeParallel__,self.set)
        
        #self.__makeSubsets__()
       
    def __degree__(self, xk, c1, c2):
        
        d_xk_c1 = distance.euclidean(xk, c1)
        d_xk_c2 = distance.euclidean(xk, c2)

            
        return d_xk_c1/(d_xk_c1 + d_xk_c2)

    def __makeParallel__(self, i):
        index = math.floor(self.__degree__(i, self.reference[0], self.reference[1]) * self.num_subsets)
        self.subsets[index].append(self.set.index(i))
  
    def __makeSubsets__(self):
        start_time = time.time()
        for i in range(len(self.set)):
            index = int(self.__degree__(self.set[i], self.reference[0], self.reference[1]) * self.num_subsets)
            self.subsets[index].append(i)
        print("MAKE SUBSETS --- %s seconds ---" % (time.time() - total_time))
        
    def __reference__(self):
        dimension = len(self.set[0])
        size = len(self.set)

        #reference[0] = min of set iset; reference[1] = max of set iset
        reference = [[], []]

        for i in range(dimension):
            reference[0].append(math.inf)
            reference[1].append(-math.inf)

        for i in range(size):
            for j in range(dimension):
                if self.set[i][j] < reference[0][j]:
                    reference[0][j] = self.set[i][j]

                if self.set[i][j] > reference[1][j]:
                    reference[1][j] = self.set[i][j]
        return reference

if __name__ == '__main__':    
    total_time = time.time()    
    N = 10000
    Set = np.random.random((N,2))

    anObjetct = Testing(inputSet = Set.tolist(), divisor = 1000)

    anObjetct .__build__()
    print("TOTAL --- %s seconds ---" % (time.time() - total_time))

EXAMPLE INPUT

10 vectors of two positions (as points in two-dimensional space)

N = 10
Set = np.random.random((N,2))

OUTPUT

Totally inconsistent result.

>>> objeto.subsets
[[], [], [], []]
Execution time: 0.5 seconds

I just need a multiprocessing version of the makeSubsets function. pls, help! thanks

1
In multiprocessing you generally need to pass stuff into the processes and collect output. You then stitch together the result. I don't think your various subprocesses will be using the same instance of the class Testing. - forgetso
@forgetso do you have any suggestions on how i could implement multiprocessing or multithreading for the makeSubsets function? - thmsagc
Yes, I've added a solution that will work with your existing class. However, I would recommend instead doing this as a "bag of functions". The class is overly complicated. - forgetso

1 Answers

0
votes

The class is overkill for this problem, in my opinion, but if you want to keep it you could instantiate one of the classes in each sub processes as follows.

import numpy as np
import time
from scipy.spatial import distance
import math

class Testing(object):
    
    def __init__(self, inputSet, divisor):
        self.set = inputSet
        self.divisor = divisor
        
    def __build__(self):

        self.reference = self.__reference__()
        self.num_subsets = ((len(self.set)-1) // self.divisor)
        self.subsets = [[] for i in range(self.num_subsets)]
        self.__makeSubsets__()
       
    def __degree__(self, xk, c1, c2):
        
        d_xk_c1 = distance.euclidean(xk, c1)
        d_xk_c2 = distance.euclidean(xk, c2)
        return d_xk_c1/(d_xk_c1 + d_xk_c2)

  
    def __makeSubsets__(self):
        start_time = time.time()
        for i in range(len(self.set)):
            index = int(self.__degree__(self.set[i], self.reference[0], self.reference[1]) * self.num_subsets)
            self.subsets[index].append(i)
        
    def __reference__(self):
        dimension = len(self.set[0])
        size = len(self.set)

        #reference[0] = min of set iset; reference[1] = max of set iset
        reference = [[], []]

        for i in range(dimension):
            reference[0].append(math.inf)
            reference[1].append(-math.inf)

        for i in range(size):
            for j in range(dimension):
                if self.set[i][j] < reference[0][j]:
                    reference[0][j] = self.set[i][j]

                if self.set[i][j] > reference[1][j]:
                    reference[1][j] = self.set[i][j]
        return reference


import multiprocessing
# split a list into evenly sized chunks
def chunks(lst, n):
    if n < 1:
        n = 1
    result = [x for x in np.array_split(lst, n) if len(x)]
    return result

# this is the function that we actually run in parallel
# each instance has its own version of Testing class
def get_subsets(chnk):
    anObjetct = Testing(inputSet = chnk.tolist(), divisor = 1000)
    anObjetct .__build__()
    return anObjetct.subsets


# this code breaks the random data into 4 chunks and then passes these chunks to
# the multiprocessing job pool.
N = 10000
data = np.random.random((N,2))
chnks = chunks(data, 4)

pool = multiprocessing.Pool(processes=4)
result_list = pool.map(get_subsets, chnks)

# a testing class is constructed at the end and the subsets from the pool are assigned.
# note the length of result_list is 4 as there were 4 processes
# you can merge these into a single list if that's what you need
anObjetct = Testing(None, None)
anObjetct.subsets = result_list