0
votes

So what I am trying to do is simply to convert fields: year, month, day, hour, minute (which are of type integer as seen below) into a string type.

So I have a dataframe df_src of type :

<class 'pyspark.sql.dataframe.DataFrame'>

and here is its schema:

root
 |-- src_ip: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)

I also declared a function earlier :

def parse_df_to_string(year, month, day, hour=0, minute=0):
second = 0
return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(year, month, day, hour, minute, second)

And I also did a test and it works like a charm :

print parse_df_to_string(2016, 10, 15, 21)
print type(parse_df_to_string(2016, 10, 15, 21))

2016-10-15 21:00:00
<type 'str'>

so I also did something similar as in spark api with udf :

from pyspark.sql.functions import udf
u_parse_df_to_string = udf(parse_df_to_string)

where finally this request :

df_src.select('*', 
              u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
             ).show()

would cause :

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-126-770b587e10e6> in <module>()
     25 # Could not make this part wor..
     26 df_src.select('*',
---> 27         u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
     28              ).show()

/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
    285         +---+-----+
    286         """
--> 287         print(self._jdf.showString(n, truncate))
    288 
    289     def __repr__(self):

/opt/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()
    ...


    Py4JJavaError: An error occurred while calling o5074.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: parse_df_to_string(input[1, int, true], input[2, int, true], input[3, int, true], input[4, int, true], input[5, int, true])
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
    at org.apache.spark.sql.execution.python.PythonUDF.doGenCode(PythonUDF.scala:27)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:101)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)

...

I tried many things, I tried to call the method with only one parameter&argument...but did not help.

One way it did work though is by creating a new dataframe with a new column as follow :

df_src_grp_hr_d = df_src.select('*', concat(
    col("year"), 
    lit("-"), 
    col("month"), 
    lit("-"), 
    col("day"),
    lit(" "),
    col("hour"),
    lit(":0")).alias('time'))`

where after that I could cast the column to timestamp :

df_src_grp_hr_to_timestamp = df_src_grp_hr_d.select(
df_src_grp_hr_d['src_ip'], 
df_src_grp_hr_d['year'],
df_src_grp_hr_d['month'],
df_src_grp_hr_d['day'],
df_src_grp_hr_d['hour'],
df_src_grp_hr_d['time'].cast('timestamp'))
1
Are you sure it's not just a typo? Look at your error message: Cannot evaluate expression: parse_df_to_stringg , see the extra "g" ?paisanco
Good observation paisanco. It was just another test I tried in the meantime with a function having only one parameter. I am sorry to induce you in error, the exception is still there.aks
which is correct minute or time ?Kenji Noguchi
I want to generate new column time from columns year, month, day, hour, minute using udf parse_df_to_string.aks

1 Answers

1
votes

allright..I think I understand the problem...The cause is because my dataFrame just had a lot of data loaded in memory causing show() action to fail.

The way I realize it is that what is causing the exception :

Py4JJavaError: An error occurred while calling o2108.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: 

is really the df.show() action.

I could confirm that by executing the code snippet from : Convert pyspark string to date format

from datetime import datetime
from pyspark.sql.functions import col,udf, unix_timestamp
from pyspark.sql.types import DateType



# Creation of a dummy dataframe:
df1 = sqlContext.createDataFrame([("11/25/1991","11/24/1991","11/30/1991"), 
                            ("11/25/1391","11/24/1992","11/30/1992")], schema=['first', 'second', 'third'])

# Setting an user define function:
# This function converts the string cell into a date:
func =  udf (lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType())

df = df1.withColumn('test', func(col('first')))

df.show()

df.printSchema()

which worked! But it still did not work with my dataFrame df_src.

The cause is because I am loading a lot a lot of data in memory from my database server (like over 8-9 millions of rows) it seems that spark is unable to perform the execution within udf when .show() (which displays 20 entries by default) of the results loaded in a dataFrame.

Even if show(n=1) is called, same exception would be thrown.

But if printSchema() is called, you will see that the new column is effectively added.

One way to see if the new column is added it would be simply to call the action print dataFrame.take(10) instead.

Finally, one way to make it work is to affect a new dataframe and not call .show() when calling udf in a select() as :

df_to_string = df_src.select('*', 
          u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
         )

Then cache it :

df_to_string.cache

Now .show() can be called with no issues :

df_to_string.show()