19
votes

I'm trying to do some NLP text clean up of some Unicode columns in a PySpark DataFrame. I've tried in Spark 1.3, 1.5 and 1.6 and can't seem to get things to work for the life of me. I've also tried using Python 2.7 and Python 3.4.

I've created an extremely simple udf as seen below that should just return a string back for each record in a new column. Other functions will manipulate the text and then return the changed text back in a new column.

import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf

def dummy_function(data_str):
    cleaned_str = 'dummyData' 
    return cleaned_str

dummy_function_udf = udf(dummy_function, StringType())

Some sample data can be unzipped from here.

Here is the code I use to import the data and then apply the udf on.

# Load a text file and convert each line to a Row.
lines = sc.textFile("classified_tweets.txt")
parts = lines.map(lambda l: l.split("\t"))
training = parts.map(lambda p: (p[0], p[1]))

# Create dataframe
training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])

training_df.show(5)
+--------------------+--------------+
|               tweet|classification|
+--------------------+--------------+
|rt @jiffyclub: wi...|        python|
|rt @arnicas: ipyt...|        python|
|rt @treycausey: i...|        python|
|what's my best op...|        python|
|rt @raymondh: #py...|        python|
+--------------------+--------------+

# Apply UDF function
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
df.show(5)

When I run the df.show(5) I get the following error. I understand that the problem most likely doesn't stem from the show() but the trace doesn't give me much help.

 ---------------------------------------------------------------------------Py4JJavaError                             Traceback (most recent call last)<ipython-input-19-0b21c233c724> in <module>()
      1 df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
----> 2 df.show(5)
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    255         +---+-----+
    256         """
--> 257         print(self._jdf.showString(n, truncate))
    258 
    259     def __repr__(self):
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(
Py4JJavaError: An error occurred while calling o474.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>
IndexError: list index out of range

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>
IndexError: list index out of range

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

Actual function I'm trying:

def tag_and_remove(data_str):
    cleaned_str = ' '
    # noun tags
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
    # adjectives
    jj_tags = ['JJ', 'JJR', 'JJS']
    # verbs
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
    nltk_tags = nn_tags + jj_tags + vb_tags

    # break string into 'words'
    text = data_str.split()

    # tag the text and keep only those with the right tags
    tagged_text = pos_tag(text)
    for tagged_word in tagged_text:
        if tagged_word[1] in nltk_tags:
            cleaned_str += tagged_word[0] + ' '

    return cleaned_str


tag_and_remove_udf = udf(tag_and_remove, StringType())
3
Are you sure l.split('\t') returns more than one item? The index error is likely from training = parts.map(...). What does your data look like - are you sure there are tabs used everywhere?AChampion
Yes I can confirm that the data has two columns. I scrubbed the data of all whitespace besides spaces before putting in the flatfile. I'll put a small sample up top.dreyco676
You aren't splitting on whitespace - only tabs - l.split() would split on any whitespace.AChampion
Are you able to successfully load the dataframe? Can you run training_df.show() to confirm that it's not a problem with the original data?Kirk Broadhurst
There are 985 lines in your data set that only have 1 field. (149195 that have 2 fields)AChampion

3 Answers

10
votes

Your dataset isn't clean. 985 lines split('\t') to only one value:

>>> from operator import add
>>> lines = sc.textFile("classified_tweets.txt")
>>> parts = lines.map(lambda l: l.split("\t"))
>>> parts.map(lambda l: (len(l), 1)).reduceByKey(add).collect()
[(2, 149195), (1, 985)]
>>> parts.filter(lambda l: len(l) == 1).take(5)
[['"show me the money!”  at what point do you start trying to monetize your #startup? tweet us with #startuplife.'],
 ['a good pitch can mean money in the bank for your #startup. see how body language plays a key role:  (via: ajalumnify)'],
 ['100+ apps in five years? @2359media did it using microsoft #azure:  #azureapps'],
 ['does buying better coffee make you a better leader? little things can make a big difference:  (via: @jmbrandonbb)'],
 ['.@msftventures graduates pitched\xa0#homeautomation #startups to #vcs! check out how they celebrated: ']]

So changing your code to:

>>> training = parts.filter(lambda l: len(l) == 2).map(lambda p: (p[0], p[1].strip()))
>>> training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])
>>> df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
>>> df.show(5)
+--------------------+--------------+---------+
|               tweet|classification|    dummy|
+--------------------+--------------+---------+
|rt @jiffyclub: wi...|        python|dummyData|
|rt @arnicas: ipyt...|        python|dummyData|
|rt @treycausey: i...|        python|dummyData|
|what's my best op...|        python|dummyData|
|rt @raymondh: #py...|        python|dummyData|
+--------------------+--------------+---------+
only showing top 5 rows
7
votes

I think you're misdefining the problem, and maybe simplifying your lambda for the purposes of this question but hiding the real problem.

Your stack trace reads

File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>
IndexError: list index out of range

When I run this code it works fine:

import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf

training_df = sqlContext.sql("select 'foo' as tweet, 'bar' as classification")

def dummy_function(data_str):
     cleaned_str = 'dummyData'
     return cleaned_str

dummy_function_udf = udf(dummy_function, StringType())
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
df.show()

+-----+--------------+---------+
|tweet|classification|    dummy|
+-----+--------------+---------+
|  foo|           bar|dummyData|
+-----+--------------+---------+

Are you sure there isn't some other bug in your dummy_function_udf? What is the 'real' udf you are using - apart from this sample version?

0
votes

Below one works with the spark2,

import hashlib
import uuid
import datetime
from pyspark.sql.types import StringType

def customencoding(s):
    m = hashlib.md5()
    m.update(s.encode('utf-8'))
    d = m.hexdigest()
    return d

spark.udf.register("udf_customhashing32adadf", customencoding, StringType())

spark.sql("SELECT udf_customhashing32adadf('test') as rowid").show(10, False)

You can implement it in the same way.