1

I am currently working with spark streaming and getting data from my kafka in json. I convert my rdd to dataframe and register it as a table. After doing that when I fire a query where the column name does not exists in the dataframe it throws an error like

"'No such struct field currency in price, recipientId;'" HEre is my query val selectQuery = "lower(serials.brand) as brandname, lower(appname) as appname, lower(serials.pack) as packname, lower(serials.asset) as assetname, date_format(eventtime, 'yyyy-MM-dd HH:00:00') as eventtime, lower(eventname) as eventname, lower(client.OSName) as platform, lower(eventorigin) as eventorigin, meta.price as price, client.ip as ip, lower(meta.currency) as currency, cast(meta.total as int) as count" Here is my dataframe DataFrame[addedTime: bigint, appName: string, client: struct<ip:string>, eventName: string, eventOrigin: string, eventTime: string, geoLocation: string, location: string, meta: struct<period:string,total:string>, serials: struct<asset:string,brand:string,pack:string>, userId: string]> 

Now my json is not strict and there are times some keys may not be present. How can I safely bypass this exception if the keys or columns are not there in dataframe?

2 Answers 2

2

you can use df.columns to check columns. There are many ways to get column name and datatype df.schema. You can also log schema df.printSchema()

Sign up to request clarification or add additional context in comments.

Comments

0

So the only way I found was to create json schema for your json and then use that schema to parse your json into datafrmae

val df = sqlcontext.read.schema(schema).json(rdd)

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.