0
votes

Edit : i don't feel it's duplicate as I'm not trying to concat data in a dataframe but to get an int (or string) value to use it in a string formatting. The simple concatenation adressed with concat_ws doesn't solve the problem as "96-3-12" will not be interpreted as a date. I have to transform it to 1996-03-12 or get the int values to pass them to a datetime.date function and then apply a datefiff function.

I'm trying to query a dataset in pyspark and calculate an approximate age from birth month and birth year

from pyspark.sql.types import IntegerType
presc_par_med = med.join(presc.groupBy(presc.chaiprat).agg(F.sum(presc.nbmol).alias("nb_mol"), \
                                              F.count(presc.nbmol).alias("nb_presc"), \
                                              F.countDistinct(presc.numano).alias("nb_pat"), \
                                              F.datediff(F.current_date(), "%d-%d-15" % (med.year.cast(IntegerType()), med.month.cast(IntegerType()))).alias("age")), \
                     med.pranum == presc.chaiprat)

But when i execute this, i have an error

TypeError: %d format: a number is required, not Column

Isn't this the cast's role ? I tried with %s and got a different error.

Edit : Differents tries, taking answers and comments in account (and modifying the original query which was wrong anyway) :

1. Trying with the concat_ws function to calculate a birth date.

presc_par_med = med.withColumn("age", F.datediff(F.current_date(), \
                                F.concat_ws("-", med.year, med.month, F.lit("15")))) \
                   .withColumn("birthdate", F.concat_ws("-", med.year, med.month, F.lit("15"))) \
                   .join(presc.groupBy(presc.chaiprat) \
                              .agg(F.sum(presc.nbmol).alias("nb_mol"), \
                                   F.count(presc.nbmol).alias("nb_presc"), \
                                   F.countDistinct(presc.numano).alias("nb_pat")), \
                            med.pranum == presc.chaiprat)

The output gives us this

[Row(id=94, year=52, month=3, [...], age=None, birthdate=u'52-3-15', [...], nb_mol=14514, nb_presc=3624, nb_pat=520)]

The date is well concatenated but, as the format in incorrect, the age is None. When I try to modify year and month to complete them with the century and a .zfill(2), i have the same problem than before : i can't reach any integer or string value...

2. Trying with a birthdate function to have a correct date

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import datetime

@udf("date")
def birthdate(year, month, day):
    return datetime.date(year, month, day)

presc_par_med = med.withColumn("age", F.datediff(F.current_date(), \
                                F.concat_ws("-", med.year, med.month, F.lit("15")))) \
                   .withColumn("birthdate", birthdate(med.year.cast("int"), med.month.cast("int"), 15)) \
                   .join(presc.groupBy(presc.chaiprat) \
                              .agg(F.sum(presc.nbmol).alias("nb_mol"), \
                                   F.count(presc.nbmol).alias("nb_presc"), \
                                   F.countDistinct(presc.numano).alias("nb_pat")), \
                            med.pranum == presc.chaiprat)

I've got an error Error :

---> 12 presc_par_med = ...
[...]
Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)

3. lit for function

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import datetime

@udf("date")
def birthdate(year, month, day):
    return datetime.date(year, month, day)

presc_par_med = med.withColumn("age", F.datediff(F.current_date(), \
                                F.concat_ws("-", med.year, med.month, F.lit("15")))) \
                   .withColumn("birthdate", birthdate(med.year.cast("integer"), med.month.cast("integer"), F.lit(15))) \
                   .join(presc.groupBy(presc.chaiprat) \
                              .agg(F.sum(presc.nbmol).alias("nb_mol"), \
                                   F.countDistinct(presc.numano).alias("nb_pat")), \
                            med.pranum == presc.chaiprat)
presc_par_med.take(1)

Error :

Py4JJavaError: An error occurred while calling o2126.collectToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
[...]

  File "/usr/lib/spark/python/pyspark/worker.py", line 104, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/pyspark/worker.py", line 69, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "<ipython-input-90-1cbfa91d0947>", line 10, in datenaiss
TypeError: an integer is required
1

1 Answers

1
votes

Column expressions represent transformations of the logical execution plan not values. Casting is not different - maps one expression to another expression.

To operate on values using plain Python functions you could use udf:

from pyspark.sql.functions import udf

@udf("string")
def fifteenth(year, month):
    return "%d-%d-15" % (year, month)

or

import datetime

@udf("date")
def fifteenth(year, month):
    return datetime.date(year, month, 15)

and use it as

F.datediff(
    F.current_date(), 
    fifteenth(med.year.cast("integer"), med.month.cast("integer"))
)

but you really shouldn't. It is better to:

from pyspark.sql.functions.import concat_ws, lit

concat_ws("-", med.year, med.month, lit("15"))