1

I have created a function to read JSON as a string with its schema. Then using that function in spark streaming. I am getting error while doing so. The same piece works when I create schema first, then use that schema to read, but doesn't work in single line. How can I fix it?

def processBatch(microBatchOutputDF: DataFrame, batchId: Long) { TOPICS.split(',').foreach(topic =>{ var TableName = topic.split('.').last.toUpperCase var df = microBatchOutputDF /*var schema = schema_of_json(df .select($"value") .filter($"topic".contains(topic)) .as[String] )*/ var jsonDataDf = df.filter($"topic".contains(topic)) .withColumn("jsonData", from_json($"value", schema_of_json(lit($"value".as[String])), scala.collection.immutable.Map[String, String]().asJava)) var srcTable = jsonDataDf .select(col(s"jsonData.payload.after.*"), $"offset", $"timestamp") srcTable .select(srcTable.columns.map(c => col(c).cast(StringType)) : _*) .write .mode("append").format("delta").save("/mnt/datalake/raw/kafka/" + TableName) spark.sql(s"""CREATE TABLE IF NOT EXISTS kafka_raw.$TableName USING delta LOCATION '/mnt/datalake/raw/kafka/$TableName'""") } ) } 

Spark streaming code

import org.apache.spark.sql.streaming.Trigger val StreamingQuery = InputDf .select("*") .writeStream.outputMode("update") .option("queryName", "StreamingQuery") .foreachBatch(processBatch _) .start() 

Error: org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json/schema_of_csv functions instead of schema_of_json(value)

2 Answers 2

1

Error –org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json/schema_of_csv functions instead of schema_of_json(value)

Above error suggests issue with from_json() function.

Syntax:- from_json(jsonStr, schema[, options]) - Returns a struct value with the given jsonStr and schema.

Refer below Examples:

> SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE'); {"a":1,"b":0.8} > SELECT from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); {"time":2015-08-26 00:00:00} 

Refer - https://docs.databricks.com/sql/language-manual/functions/from_json.html

Sign up to request clarification or add additional context in comments.

Comments

0

This is how I solved this. I created a filtered dataframe from the kafka output dataframe, and applied all the logics in it, as it was before. The problem with generating schema while reading is, from_json doesn't know which exact row to use from all the rows of the dataframe.

def processBatch(microBatchOutputDF: DataFrame, batchId: Long) { TOPICS.split(',').foreach(topic =>{ var TableName = topic.split('.').last.toUpperCase var df = microBatchOutputDF.where(col("topic") === topic) var schema = schema_of_json(df .select($"value") .filter($"topic".contains(topic)) .as[String] ) var jsonDataDf = df.withColumn("jsonData", from_json($"value", schema, scala.collection.immutable.Map[String, String]().asJava)) var srcTable = jsonDataDf .select(col(s"jsonData.payload.after.*"), $"offset", $"timestamp") srcTable .select(srcTable.columns.map(c => col(c).cast(StringType)) : _*) .write .mode("append").format("delta").save("/mnt/datalake/raw/kafka/" + TableName) spark.sql(s"""CREATE TABLE IF NOT EXISTS kafka_raw.$TableName USING delta LOCATION '/mnt/datalake/raw/kafka/$TableName'""") } ) } 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.