2
votes

In the middle of my project i got stuck with this Unsupported Operation Exception.Here is my scenario, I have created a udf called filter and registered it as fnGetChargeInd. this function takes 4 parameters a unicode timestamp which already formatted from queries as datetime type, a string frequency, string begmonth,and string currperiod. by this it calculates chargeAmt and return a Integer type value.here is my udf function code

def filter(startdate, frequency, begmonth, testperiod):
    startdatestring = startdate.strftime("%Y-%m-%d")
    # print "startdatestring->", startdatestring
    startdateyearstring = startdatestring[0:4]
    startdatemonthstring = startdatestring[5:7]
    # print "startdateyearstring->", startdateyearstring
    startdateyearint = int(startdateyearstring)
    startdatemonthint = int(startdatemonthstring)
    # print "startdateyearint is->", startdateyearint
    # print "startdateyearinttype", type(startdateyearint)
    currYear = startdateyearint
    currMonth = startdatemonthint
    currperiod = startdateyearstring + startdatemonthstring
    if (frequency == 'M'):
        return 1
    if (frequency == 'S' or frequency == 'A' and begmonth != None):
        currMonth = int(begmonth)
        print"in if statement", currMonth
    # check nextperiod calculation
    if (currperiod == testperiod):
        return 1
    if (currperiod > testperiod):
        return 0
    if (frequency == 'Q'):
        currMonth = currMonth + 3
    if (frequency == 'S'):
        currMonth = currMonth + 1
    if (currMonth > 12):
        currMonth = currMonth - 12
        currYear = currYear + 1
    return 0

and this is my TimestampConversion Code for formatting unicode as datetime

def StringtoTimestamp(datetext):
    if(datetext==None):
        return None
    else:
        datevalue = datetime.datetime.strptime(datetext, "%b %d %Y %H:%M:%S:%f%p")
        return datevalue

spark.udf.register('TimestampConvert',lambda datetext:StringtoTimestamp(datetext),TimestampType())

spark.udf.register("fnGetChargeInd",lambda x,y,z,timeperiod:filter(x,y,z,timeperiod),IntegerType())

now after this i have queried for chargeAmt calculation table

spark.sql("select b.ENTITYID as ENTITYID, cm.BLDGID as BldgID,cm.LEASID as LeaseID,coalesce(l.SUITID,(select EmptyDefault from EmptyDefault)) as SuiteID,(select CurrDate from CurrDate) as TxnDate,cm.INCCAT as IncomeCat,'??' as SourceCode,(Select CurrPeriod from CurrPeriod)as Period,coalesce(case when cm.DEPARTMENT ='@' then 'null' else cm.DEPARTMENT end, null) as Dept,'Lease' as ActualProjected ,fnGetChargeInd(TimestampConvert(cm.EFFDATE),cm.FRQUENCY,cm.BEGMONTH,('select CurrPeriod from CurrPeriod'))*coalesce (cm.AMOUNT,0) as  ChargeAmt,0 as OpenAmt,cm.CURRCODE as CurrencyCode,case when ('PERIOD.DATACLSD') is null then 'Open' else 'Closed' end as GLClosedStatus,'Unposted'as GLPostedStatus ,'Unpaid' as PaidStatus,cm.FRQUENCY as Frequency,0 as RetroPD from CMRECC cm join BLDG b on cm.BLDGID =b.BLDGID join LEAS l on cm.BLDGID =l.BLDGID and cm.LEASID =l.LEASID and (l.VACATE is null or l.VACATE >= ('select CurrDate from CurrDate')) and (l.EXPIR >= ('select CurrDate from CurrDate') or l.EXPIR < ('select RunDate from RunDate')) left outer join PERIOD on b.ENTITYID =  PERIOD.ENTITYID and ('select CurrPeriod from CurrPeriod')=PERIOD.PERIOD where ('select CurrDate from CurrDate')>=cm.EFFDATE  and (select CurrDate from CurrDate) <= coalesce(cm.EFFDATE,cast(date_add(( select min(cm2.EFFDATE) from CMRECC cm2 where cm2.BLDGID = cm.BLDGID and cm2.LEASID = cm.LEASID and cm2.INCCAT = cm.INCCAT and 'cm2.EFFDATE' > 'cm.EFFDATE'),-1) as timestamp)  ,case when l.EXPIR <(select RunDate from RunDate)then (Select RunDate from RunDate) else l.EXPIR end)").show()

It calculates chargeAmt Perfectly enter image description here

i saved this result in Fact_Temp temporary table now PROBLEM ARISE i wanted to query a filtered table where i will get data after deleting row where ActualProjected=Lease and ChargeAmt=0

spark.sql("select * from Fact_Temp except(select * from Fact_Temp where ActualProjected='Lease' and ChargeAmt='0')").show()

it gives me the Exception

java.lang.UnsupportedOperationException: Cannot evaluate expression: fnGetChargeInd(TimestampConvert(input[0, string, true]), input[1, string, true], input[2, string, true], select CurrPeriod from CurrPeriod)

what i have figured out is chargeAmt is not taking any value cause if i do query without this condition it is working well

spark.sql("select * from Fact_Temp except(select * from Fact_Temp where ActualProjected='Lease')").show()

this gives me expected EmptyTable.Logically i think chargeAMt value are set in Table after calculation and i have registered that table so value are saved . so when i am querying on saved table.I have no idea why function is calling here.I have seen this post in stackoverflow already UnsupportedOperationException: Cannot evalute expression: .. when adding new column withColumn() and udf() for understanding but though my case is different here. i have tried dataframe printschema i have only seen the schema of this temptable

How could i solve this problem any guidance is highly appreciated. am i missing something in my code here.Kindly Help me. I am using Pyspark 2.0 thanks in advance Kalyan

enter image description here

1

1 Answers

5
votes

ok so far i have figured out this is spark 2.0 bug . this following link has solved my problem https://issues.apache.org/jira/browse/SPARK-17100

i have shifted from 2.0 to 2.1.0 and it has worked for me.