0
votes

I am using python3 in Jupyter under Anaonda distribution and using sgt package 2.0.3. System - 64bit 8GB. The SGT function worked fine when I did not use multiprocessing but it throws an error when I use multiprocessing. Could you please help me if there is any system dependency to use the multiprocessing functionality?

from sgt import SGT
import numpy as np
import pandas as pd
import pandarallel

corpus = pd.DataFrame([[1, ["B","B","A","C","A","C","A","A","B","A"]], 
                   [2, ["C", "Z", "Z", "Z", "D"]]], 
                  columns=['id', 'sequence'])
sgt = SGT(kappa=1, 
      flatten=True, 
      lengthsensitive=False, 
      mode='default')
sgt.fit_transform(corpus)

However, when I run mode='multiprocessing' it throws following error

sgt = SGT(kappa=1, 
      flatten=True, 
      lengthsensitive=False,
      mode='multiprocessing')
sgt.fit_transform(corpus)

Output:

 

    INFO: Pandarallel will run on 7 workers.
    INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.

    ---------------------------------------------------------------------------
    AttributeError                            Traceback (most recent call last)
     in 
          3           lengthsensitive=False,
          4           mode='multiprocessing')
    ----> 5 sgt.fit_transform(corpus)

    ~\AppData\Local\Continuum\anaconda3\lib\site-packages\sgt\sgt.py in fit_transform(self, corpus)
        214                                         list(self.fit(x['sequence'])),
        215                                         axis=1,
    --> 216                                         result_type='expand')
        217             sgt.columns = ['id'] + self.feature_names
        218             return sgt

    ~\AppData\Local\Continuum\anaconda3\lib\site-packages\pandarallel\pandarallel.py in closure(data, func, *args, **kwargs)
        440         try:
        441             pool = Pool(
    --> 442                 nb_workers, worker_init, (prepare_worker(use_memory_fs)(worker),),
        443             )
        444 

    ~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\context.py in Pool(self, processes, initializer, initargs, maxtasksperchild)
        117         from .pool import Pool
        118         return Pool(processes, initializer, initargs, maxtasksperchild,
    --> 119                     context=self.get_context())
        120 
        121     def RawValue(self, typecode_or_type, *args):

    ~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context)
        174         self._processes = processes
        175         self._pool = []
    --> 176         self._repopulate_pool()
        177 
        178         self._worker_handler = threading.Thread(

    ~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py in _repopulate_pool(self)
        239             w.name = w.name.replace('Process', 'PoolWorker')
        240             w.daemon = True
    --> 241             w.start()
        242             util.debug('added worker')
        243 

    ~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py in start(self)
        110                'daemonic processes are not allowed to have children'
        111         _cleanup()
    --> 112         self._popen = self._Popen(self)
        113         self._sentinel = self._popen.sentinel
        114         # Avoid a refcycle if the target function holds an indirect

    ~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
        320         def _Popen(process_obj):
        321             from .popen_spawn_win32 import Popen
    --> 322             return Popen(process_obj)
        323 
        324     class SpawnContext(BaseContext):

    ~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
         87             try:
         88                 reduction.dump(prep_data, to_child)
    ---> 89                 reduction.dump(process_obj, to_child)
         90             finally:
         91                 set_spawning_popen(None)

    ~\AppData\Local\Continuum\anaconda3\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
         58 def dump(obj, file, protocol=None):
         59     '''Replacement for pickle.dump() using ForkingPickler.'''
    ---> 60     ForkingPickler(file, protocol).dump(obj)
         61 
         62 #

    AttributeError: Can't pickle local object 'prepare_worker..closure..wrapper'

1
Best way to get an answer here is to create an issue at the github repo. github.com/cran2367/sgt/issueskspr

1 Answers

0
votes

This is possibly due to the pandas version. Check whether your pandas version is 1.x. If not, upgrade your pandas version.

$ pip install pandas --upgrade