1
votes

I am getting the following error when trying to run some code through Spark-Submit or Zeppelin: "_pickle.PicklingError: args[0] from __ newobj __ args has the wrong class"

I have looked around at posts with the same problem and haven't much insight into this issue.

The traceback (included below) points to one of the udfs I use:

udf_stop_words = udf(stop_words, ArrayType(StringType()))

def stop_words(words):
    return list(word.lower() for word in words if word.lower() not in  stopwords.words("english"))

Both the input and output of the function are lists of strings. These are 3 rows from the input:

[Row(split_tokenized_activity_description=['A', 'delightful', '45', 'minute', 'Swedish', 'style', 'massage']), Row(split_tokenized_activity_description=['A', 'more', 'intense', '45', 'minute', 'version', 'of', 'a', 'Swedish', 'style', 'massage']), Row(split_tokenized_activity_description=['A', 'relaxing', '45', 'minute', 'Swedish', 'style', 'massage'])

This is a snippet of the code I'm working with.

def special_car(x):
    # remove the special character and replace them with the stop word " " (space)
    return [re.sub('[^A-Za-z0-9]+', ' ', x)]

# Create UDF from function
udf_special_car = udf(special_car, ArrayType(StringType()))

# Function to remove stops words
def stop_words(words):
    return list(word.lower() for word in words if word.lower() not in  stopwords.words("english"))

udf_stop_words = udf(stop_words, ArrayType(StringType()))

# Load in data
df_tags = spark.sql("select * from database")

# Remove special Characters
df1_tags = df_tags.withColumn('tokenized_name', udf_special_car(df_tags.name))
df2_tags = df1_tags.withColumn('tokenized_description', udf_special_car(df1_tags.description))

# Select only relevent columns
df3_tags = df2_tags.select(['tag_id', 'tokenized_name', 'tokenized_description'])

# Tokenize tag_name and tag_desc (Seperate on spaces) (This uses the pyspark.sql.split function)
df4_tags = df3_tags.withColumn('split_tokenized_name', split(df3_tags['tokenized_name'].getItem(0), ' '))
df5_tags = df4_tags.withColumn('split_tokenized_description', split(df3_tags['tokenized_description'].getItem(0), ' '))

# Select only relevent columns
df6_tags = df5_tags.select(['tag_id', 'split_tokenized_name', 'split_tokenized_description'])

# Remove Stop words
df7_tags = df6_tags.withColumn('stop_words_tokenized_name', udf_stop_words(df6_tags.split_tokenized_name))
df8_tags = df7_tags.withColumn('stop_words_tokenized_description', udf_stop_words(df7_tags.split_tokenized_description))

Oddly, First TWO TIMES running my code through Zeppelin I get the error, but after the 3rd try, it runs just fine, and the output is how I expect it to be. Zeppelin is only for testing, though; I need to get this to run through Spark-Submit.

Traceback (most recent call last): File "/tmp/testing_test.py", line 262, in udf_stop_words = udf(stop_words, ArrayType(StringType())) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1872, in udf File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1830, in init File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1835, in _create_judf File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1815, in _wrap_function File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2359, in _prepare_for_python_RDD File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 460, in dumps File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 703, in dumps File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 147, in dump File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 409, in dump self.save(obj) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 736, in save_tuple save(element) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 248, in save_function File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 296, in save_function_tuple File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 852, in _batch_setitems save(v) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 564, in save_reduce _pickle.PicklingError: args[0] from newobj args has the wrong class

I have tried several things to resolve this issue, and none of them have worked. They all return the same error.

I have tried changing the udf to be a one-line lambda function:

udf(lambda words: list(word.lower() for word in words if word.lower() not in stopwords.words('english')), ArrayType(StringType())).

I have tried changing the udf to return a string:

udf_stop_words = udf(stop_words, StringType())

and changing the udf a bit to match.

def stop_words(words):
    return str(word.lower() for word in words if word.lower() not in stopwords.words('english'))

I have tried to define it as a StructType with both:

udf_stop_words = udf(stop_words, StructType([StructField("words", ArrayType(StringType()), False)])) 

and

udf_stop_words = udf(stop_words, StructType([StructField("words", StringType(), False)])).

I have also tried many combinations of the above.

2
Can you add more context of your application? (like all the code and sample data) And full traceback for the error?Mariusz

2 Answers

1
votes

The return type should be an ArrayType(StringType()).

I am not sure about this, but maybe the problem comes from the fact that you don't have nltk installed on your nodes (or the corpus stopwords was never downloaded on the nodes). Since calling stopwords.words("english") inside your UDF is like calling it on the nodes, it might fail because it can't find the corpus.

Since stopwords.words("english") is just a list, you should call it on your driver and then broadcast it to the nodes:

from nltk.corpus import stopwords
english_stopwords = stopwords.words("english")
sc.broadcast(english_stopwords)
def stop_words(words):
    return list(word.lower() for word in words if word.lower() not in english_stopwords)

from pyspark.sql.types import ArrayType, StringType
import pyspark.sql.functions as psf
udf_stop_words = psf.udf(stop_words, ArrayType(StringType()))
0
votes

I had a similar issue. In my case the exception was being thrown because i defined a class inside my spark script itself. It was resolved by creating a separate .py file containing the class definition and methods. Then add that script to your script by sc.addPyFile(path) and finally from FileName import *.