7
votes

I have a really strange error with spark dataframes which causes a string to be evaluated as a timestamp.

Here is my setup code:

from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

new_schema = StructType([StructField("item_id", StringType(), True),
                         StructField("date", TimestampType(), True),
                         StructField("description", StringType(), True)
                        ])

df = sqlContext.createDataFrame([Row(description='description', date=datetime.utcnow(), item_id='id_string')], new_schema)

this gives me the following error:

AttributeError Traceback (most recent call last) in () ----> 1 df = sqlContext.createDataFrame([Row(description='hey', date=datetime.utcnow(), item_id='id_string')], new_schema)

/home/florian/spark/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema) 307 Py4JJavaError: ... 308 """ --> 309 return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema) 310 311 @since(1.3)

/home/florian/spark/python/pyspark/sql/session.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema) 522 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) 523 else: --> 524 rdd, schema = self._createFromLocal(map(prepare, data), schema) 525 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd()) 526 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/home/florian/spark/python/pyspark/sql/session.pyc in _createFromLocal(self, data, schema) 397 398 # convert python objects to sql data --> 399 data = [schema.toInternal(row) for row in data] 400 return self._sc.parallelize(data), schema 401

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, obj) 574 return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields)) 575 elif isinstance(obj, (tuple, list)): --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 577 elif hasattr(obj, "dict"): 578 d = obj.dict

/home/florian/spark/python/pyspark/sql/types.pyc in ((f, v)) 574 return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields)) 575 elif isinstance(obj, (tuple, list)): --> 576 return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) 577 elif hasattr(obj, "dict"): 578 d = obj.dict

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, obj) 434 435 def toInternal(self, obj): --> 436 return self.dataType.toInternal(obj) 437 438 def fromInternal(self, obj):

/home/florian/spark/python/pyspark/sql/types.pyc in toInternal(self, dt) 188 def toInternal(self, dt): 189 if dt is not None: --> 190 seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo 191 else time.mktime(dt.timetuple())) 192 return int(seconds * 1e6 + dt.microsecond)

AttributeError: 'str' object has no attribute 'tzinfo'

this looks as if a string was passed to TimestampType.toInternal()

the really weird thing is that this data frame creates the same error:

df = sqlContext.createDataFrame([Row(description='hey', date=None, item_id='id_string')], new_schema)

while this one works:

df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id='id_string')], new_schema)

and this one works as well:

df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)

For me, this now means that the pyspark somehow puts the value from "item_id" into the column "date" and therefore creates this error. Did I do something wrong? Is this a bug within data frames?

info: I am using pyspark 2.0.1

Edit:

df = sqlContext.createDataFrame([Row(description=None, date=datetime.now(), item_id=None)], new_schema)
df.first()

Row(item_id=u'java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2017,MONTH=1,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=3,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=1,HOUR=3,HOUR_OF_DAY=15,MINUTE=19,SECOND=30,MILLISECOND=85,ZONE_OFFSET=?,DST_OFFSET=?]', date=None, description=None)

2

2 Answers

10
votes

When you create a Row object, the fields are sorted alphabetically (http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.Row), so when you are creating the Row(description, date, item_id) object, it will be ordered as (date, description, item_id).

As your schema is ordered as StringType, TimestampType, StringType, when creating a DataFrame with this Row and schema, Spark will map what is in date to a StringType, what is in description to a TimestampType and item_id to a StringType.

Passing a Timestamp (in datetime format) to a StringType does not cause an error, but passing a String to a TimestampType does, since it asks for the tzinfo attribute, which, as the error stated, the String object does not has it.

Also, the reason the dataframes that worked for you actually worked is because None is being passed to the TimestampType in your schema, which is an acceptable value.

6
votes

Basing this off of the answer above from @rafael-zanetti. You can do the following to sort your columns:

new_schema = [StructField("item_id", StringType(), True),
                     StructField("date", TimestampType(), True),
                     StructField("description", StringType(), True)]
new_schema = StructType(sorted(new_schema, key=lambda f: f.name))