In the middle of my project i got stuck with this Unsupported Operation Exception.Here is my scenario, I have created a udf called filter and registered it as fnGetChargeInd. this function takes 4 parameters a unicode timestamp which already formatted from queries as datetime type, a string frequency, string begmonth,and string currperiod. by this it calculates chargeAmt and return a Integer type value.here is my udf function code
def filter(startdate, frequency, begmonth, testperiod):
startdatestring = startdate.strftime("%Y-%m-%d")
# print "startdatestring->", startdatestring
startdateyearstring = startdatestring[0:4]
startdatemonthstring = startdatestring[5:7]
# print "startdateyearstring->", startdateyearstring
startdateyearint = int(startdateyearstring)
startdatemonthint = int(startdatemonthstring)
# print "startdateyearint is->", startdateyearint
# print "startdateyearinttype", type(startdateyearint)
currYear = startdateyearint
currMonth = startdatemonthint
currperiod = startdateyearstring + startdatemonthstring
if (frequency == 'M'):
return 1
if (frequency == 'S' or frequency == 'A' and begmonth != None):
currMonth = int(begmonth)
print"in if statement", currMonth
# check nextperiod calculation
if (currperiod == testperiod):
return 1
if (currperiod > testperiod):
return 0
if (frequency == 'Q'):
currMonth = currMonth + 3
if (frequency == 'S'):
currMonth = currMonth + 1
if (currMonth > 12):
currMonth = currMonth - 12
currYear = currYear + 1
return 0
and this is my TimestampConversion Code for formatting unicode as datetime
def StringtoTimestamp(datetext):
if(datetext==None):
return None
else:
datevalue = datetime.datetime.strptime(datetext, "%b %d %Y %H:%M:%S:%f%p")
return datevalue
spark.udf.register('TimestampConvert',lambda datetext:StringtoTimestamp(datetext),TimestampType()
)
spark.udf.register("fnGetChargeInd",lambda x,y,z,timeperiod:filter(x,y,z,timeperiod),IntegerType())
now after this i have queried for chargeAmt calculation table
spark.sql("select b.ENTITYID as ENTITYID, cm.BLDGID as BldgID,cm.LEASID as LeaseID,coalesce(l.SUITID,(select EmptyDefault from EmptyDefault)) as SuiteID,(select CurrDate from CurrDate) as TxnDate,cm.INCCAT as IncomeCat,'??' as SourceCode,(Select CurrPeriod from CurrPeriod)as Period,coalesce(case when cm.DEPARTMENT ='@' then 'null' else cm.DEPARTMENT end, null) as Dept,'Lease' as ActualProjected ,fnGetChargeInd(TimestampConvert(cm.EFFDATE),cm.FRQUENCY,cm.BEGMONTH,('select CurrPeriod from CurrPeriod'))*coalesce (cm.AMOUNT,0) as ChargeAmt,0 as OpenAmt,cm.CURRCODE as CurrencyCode,case when ('PERIOD.DATACLSD') is null then 'Open' else 'Closed' end as GLClosedStatus,'Unposted'as GLPostedStatus ,'Unpaid' as PaidStatus,cm.FRQUENCY as Frequency,0 as RetroPD from CMRECC cm join BLDG b on cm.BLDGID =b.BLDGID join LEAS l on cm.BLDGID =l.BLDGID and cm.LEASID =l.LEASID and (l.VACATE is null or l.VACATE >= ('select CurrDate from CurrDate')) and (l.EXPIR >= ('select CurrDate from CurrDate') or l.EXPIR < ('select RunDate from RunDate')) left outer join PERIOD on b.ENTITYID = PERIOD.ENTITYID and ('select CurrPeriod from CurrPeriod')=PERIOD.PERIOD where ('select CurrDate from CurrDate')>=cm.EFFDATE and (select CurrDate from CurrDate) <= coalesce(cm.EFFDATE,cast(date_add(( select min(cm2.EFFDATE) from CMRECC cm2 where cm2.BLDGID = cm.BLDGID and cm2.LEASID = cm.LEASID and cm2.INCCAT = cm.INCCAT and 'cm2.EFFDATE' > 'cm.EFFDATE'),-1) as timestamp) ,case when l.EXPIR <(select RunDate from RunDate)then (Select RunDate from RunDate) else l.EXPIR end)").show()
It calculates chargeAmt Perfectly
i saved this result in Fact_Temp temporary table now PROBLEM ARISE i wanted to query a filtered table where i will get data after deleting row where ActualProjected=Lease and ChargeAmt=0
spark.sql("select * from Fact_Temp except(select * from Fact_Temp where ActualProjected='Lease' and ChargeAmt='0')").show()
it gives me the Exception
java.lang.UnsupportedOperationException: Cannot evaluate expression: fnGetChargeInd(TimestampConvert(input[0, string, true]), input[1, string, true], input[2, string, true], select CurrPeriod from CurrPeriod)
what i have figured out is chargeAmt is not taking any value cause if i do query without this condition it is working well
spark.sql("select * from Fact_Temp except(select * from Fact_Temp where ActualProjected='Lease')").show()
this gives me expected EmptyTable.Logically i think chargeAMt value are set in Table after calculation and i have registered that table so value are saved . so when i am querying on saved table.I have no idea why function is calling here.I have seen this post in stackoverflow already UnsupportedOperationException: Cannot evalute expression: .. when adding new column withColumn() and udf() for understanding but though my case is different here. i have tried dataframe printschema i have only seen the schema of this temptable
How could i solve this problem any guidance is highly appreciated. am i missing something in my code here.Kindly Help me. I am using Pyspark 2.0 thanks in advance Kalyan