1
votes

I am currently working with spark streaming and getting data from my kafka in json. I convert my rdd to dataframe and register it as a table. After doing that when I fire a query where the column name does not exists in the dataframe it throws an error like

"'No such struct field currency in price, recipientId;'"

HEre is my query
val selectQuery = "lower(serials.brand) as brandname, lower(appname) as appname, lower(serials.pack) as packname, lower(serials.asset) as assetname, date_format(eventtime, 'yyyy-MM-dd HH:00:00') as eventtime, lower(eventname) as eventname, lower(client.OSName) as platform, lower(eventorigin) as eventorigin, meta.price as price, client.ip as ip, lower(meta.currency) as currency, cast(meta.total as int) as count"

Here is my dataframe
DataFrame[addedTime: bigint, appName: string, client: struct<ip:string>, eventName: string, eventOrigin: string, eventTime: string, geoLocation: string, location: string, meta: struct<period:string,total:string>, serials: struct<asset:string,brand:string,pack:string>, userId: string]>

Now my json is not strict and there are times some keys may not be present. How can I safely bypass this exception if the keys or columns are not there in dataframe?

2

2 Answers

2
votes

you can use df.columns to check columns. There are many ways to get column name and datatype df.schema. You can also log schema df.printSchema()

0
votes

So the only way I found was to create json schema for your json and then use that schema to parse your json into datafrmae

val df = sqlcontext.read.schema(schema).json(rdd)