Edit : i don't feel it's duplicate as I'm not trying to concat data in a dataframe but to get an int (or string) value to use it in a string formatting. The simple concatenation adressed with concat_ws doesn't solve the problem as "96-3-12" will not be interpreted as a date. I have to transform it to 1996-03-12 or get the int values to pass them to a datetime.date function and then apply a datefiff function.
I'm trying to query a dataset in pyspark and calculate an approximate age from birth month and birth year
from pyspark.sql.types import IntegerType
presc_par_med = med.join(presc.groupBy(presc.chaiprat).agg(F.sum(presc.nbmol).alias("nb_mol"), \
F.count(presc.nbmol).alias("nb_presc"), \
F.countDistinct(presc.numano).alias("nb_pat"), \
F.datediff(F.current_date(), "%d-%d-15" % (med.year.cast(IntegerType()), med.month.cast(IntegerType()))).alias("age")), \
med.pranum == presc.chaiprat)
But when i execute this, i have an error
TypeError: %d format: a number is required, not Column
Isn't this the cast's role ? I tried with %s and got a different error.
Edit : Differents tries, taking answers and comments in account (and modifying the original query which was wrong anyway) :
1. Trying with the concat_ws function to calculate a birth date.
presc_par_med = med.withColumn("age", F.datediff(F.current_date(), \
F.concat_ws("-", med.year, med.month, F.lit("15")))) \
.withColumn("birthdate", F.concat_ws("-", med.year, med.month, F.lit("15"))) \
.join(presc.groupBy(presc.chaiprat) \
.agg(F.sum(presc.nbmol).alias("nb_mol"), \
F.count(presc.nbmol).alias("nb_presc"), \
F.countDistinct(presc.numano).alias("nb_pat")), \
med.pranum == presc.chaiprat)
The output gives us this
[Row(id=94, year=52, month=3, [...], age=None, birthdate=u'52-3-15', [...], nb_mol=14514, nb_presc=3624, nb_pat=520)]
The date is well concatenated but, as the format in incorrect, the age is None. When I try to modify year and month to complete them with the century and a .zfill(2), i have the same problem than before : i can't reach any integer or string value...
2. Trying with a birthdate function to have a correct date
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import datetime
@udf("date")
def birthdate(year, month, day):
return datetime.date(year, month, day)
presc_par_med = med.withColumn("age", F.datediff(F.current_date(), \
F.concat_ws("-", med.year, med.month, F.lit("15")))) \
.withColumn("birthdate", birthdate(med.year.cast("int"), med.month.cast("int"), 15)) \
.join(presc.groupBy(presc.chaiprat) \
.agg(F.sum(presc.nbmol).alias("nb_mol"), \
F.count(presc.nbmol).alias("nb_presc"), \
F.countDistinct(presc.numano).alias("nb_pat")), \
med.pranum == presc.chaiprat)
I've got an error Error :
---> 12 presc_par_med = ...
[...]
Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
3. lit for function
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import datetime
@udf("date")
def birthdate(year, month, day):
return datetime.date(year, month, day)
presc_par_med = med.withColumn("age", F.datediff(F.current_date(), \
F.concat_ws("-", med.year, med.month, F.lit("15")))) \
.withColumn("birthdate", birthdate(med.year.cast("integer"), med.month.cast("integer"), F.lit(15))) \
.join(presc.groupBy(presc.chaiprat) \
.agg(F.sum(presc.nbmol).alias("nb_mol"), \
F.countDistinct(presc.numano).alias("nb_pat")), \
med.pranum == presc.chaiprat)
presc_par_med.take(1)
Error :
Py4JJavaError: An error occurred while calling o2126.collectToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
[...]
File "/usr/lib/spark/python/pyspark/worker.py", line 104, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/usr/lib/spark/python/pyspark/worker.py", line 69, in <lambda>
return lambda *a: toInternal(f(*a))
File "<ipython-input-90-1cbfa91d0947>", line 10, in datenaiss
TypeError: an integer is required