I'm reading a stream from Kafka, and I convert the value from Kafka ( which is JSON ) in to Structure.
from_json has a variant that takes a schema of type String, but I could not find a sample. Please advise what is wrong in the below code.
Error
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', == SQL == STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING ) ) ) -------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) Program
public static void main(String[] args) throws AnalysisException { String master = "local[*]"; String brokers = "quickstart:9092"; String topics = "simple_topic_6"; SparkSession sparkSession = SparkSession .builder().appName(EmployeeSchemaLoader.class.getName()) .master(master).getOrCreate(); String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " + "addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING ) ) ) "; SparkContext context = sparkSession.sparkContext(); context.setLogLevel("ERROR"); SQLContext sqlCtx = sparkSession.sqlContext(); Dataset<Row> employeeDataset = sparkSession.readStream(). format("kafka"). option("kafka.bootstrap.servers", brokers) .option("subscribe", topics).load(); employeeDataset.printSchema(); employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string")); employeeDataset = employeeDataset.withColumn("employeeRecord", functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>())); employeeDataset.printSchema(); employeeDataset.createOrReplaceTempView("employeeView"); sparkSession.catalog().listTables().show(); sqlCtx.sql("select * from employeeView").show(); }