I am getting the following error when trying to run some code through Spark-Submit or Zeppelin: "_pickle.PicklingError: args[0] from __ newobj __ args has the wrong class"
I have looked around at posts with the same problem and haven't much insight into this issue.
The traceback (included below) points to one of the udfs I use:
udf_stop_words = udf(stop_words, ArrayType(StringType()))
def stop_words(words):
return list(word.lower() for word in words if word.lower() not in stopwords.words("english"))
Both the input and output of the function are lists of strings. These are 3 rows from the input:
[Row(split_tokenized_activity_description=['A', 'delightful', '45', 'minute', 'Swedish', 'style', 'massage']), Row(split_tokenized_activity_description=['A', 'more', 'intense', '45', 'minute', 'version', 'of', 'a', 'Swedish', 'style', 'massage']), Row(split_tokenized_activity_description=['A', 'relaxing', '45', 'minute', 'Swedish', 'style', 'massage'])
This is a snippet of the code I'm working with.
def special_car(x):
# remove the special character and replace them with the stop word " " (space)
return [re.sub('[^A-Za-z0-9]+', ' ', x)]
# Create UDF from function
udf_special_car = udf(special_car, ArrayType(StringType()))
# Function to remove stops words
def stop_words(words):
return list(word.lower() for word in words if word.lower() not in stopwords.words("english"))
udf_stop_words = udf(stop_words, ArrayType(StringType()))
# Load in data
df_tags = spark.sql("select * from database")
# Remove special Characters
df1_tags = df_tags.withColumn('tokenized_name', udf_special_car(df_tags.name))
df2_tags = df1_tags.withColumn('tokenized_description', udf_special_car(df1_tags.description))
# Select only relevent columns
df3_tags = df2_tags.select(['tag_id', 'tokenized_name', 'tokenized_description'])
# Tokenize tag_name and tag_desc (Seperate on spaces) (This uses the pyspark.sql.split function)
df4_tags = df3_tags.withColumn('split_tokenized_name', split(df3_tags['tokenized_name'].getItem(0), ' '))
df5_tags = df4_tags.withColumn('split_tokenized_description', split(df3_tags['tokenized_description'].getItem(0), ' '))
# Select only relevent columns
df6_tags = df5_tags.select(['tag_id', 'split_tokenized_name', 'split_tokenized_description'])
# Remove Stop words
df7_tags = df6_tags.withColumn('stop_words_tokenized_name', udf_stop_words(df6_tags.split_tokenized_name))
df8_tags = df7_tags.withColumn('stop_words_tokenized_description', udf_stop_words(df7_tags.split_tokenized_description))
Oddly, First TWO TIMES running my code through Zeppelin I get the error, but after the 3rd try, it runs just fine, and the output is how I expect it to be. Zeppelin is only for testing, though; I need to get this to run through Spark-Submit.
Traceback (most recent call last): File "/tmp/testing_test.py", line 262, in udf_stop_words = udf(stop_words, ArrayType(StringType())) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1872, in udf File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1830, in init File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1835, in _create_judf File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1815, in _wrap_function File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2359, in _prepare_for_python_RDD File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 460, in dumps File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 703, in dumps File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 147, in dump File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 409, in dump self.save(obj) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 736, in save_tuple save(element) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 248, in save_function File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 296, in save_function_tuple File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 852, in _batch_setitems save(v) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 564, in save_reduce _pickle.PicklingError: args[0] from newobj args has the wrong class
I have tried several things to resolve this issue, and none of them have worked. They all return the same error.
I have tried changing the udf to be a one-line lambda function:
udf(lambda words: list(word.lower() for word in words if word.lower() not in stopwords.words('english')), ArrayType(StringType())).
I have tried changing the udf to return a string:
udf_stop_words = udf(stop_words, StringType())
and changing the udf a bit to match.
def stop_words(words):
return str(word.lower() for word in words if word.lower() not in stopwords.words('english'))
I have tried to define it as a StructType with both:
udf_stop_words = udf(stop_words, StructType([StructField("words", ArrayType(StringType()), False)]))
and
udf_stop_words = udf(stop_words, StructType([StructField("words", StringType(), False)])).
I have also tried many combinations of the above.