Skip to main content
AI Assist is now on Stack Overflow. Start a chat to get instant answers from across the network. Sign up to save and share your chats.
added 28 characters in body; edited tags
Source Link
zero323
  • 331.4k
  • 108
  • 982
  • 958

I am trying to process data from Kafka using Spark Structured Streaming. The code for ingesting the data is as follows:

val enriched = df.select($"value" cast "string" as "json") .select(from_json($"json", schema) as "data") .select("data.*") 
val enriched = df.select($"value" cast "string" as "json") .select(from_json($"json", schema) as "data") .select("data.*") 

ds is a DataFrame with the data consumed from Kafka.

The problem comes when I try to read is as JSON in order to do faster queries. the function that comes from org.apache.spark.sql.functions from_json() is asking obligatory for a schema. What if the messages have some different fields?

I am trying to process data from Kafka using Spark Structured Streaming. The code for ingesting the data is as follows:

val enriched = df.select($"value" cast "string" as "json") .select(from_json($"json", schema) as "data") .select("data.*") 

ds is a DataFrame with the data consumed from Kafka.

The problem comes when I try to read is as JSON in order to do faster queries. the function that comes from org.apache.spark.sql.functions from_json() is asking obligatory for a schema. What if the messages have some different fields?

I am trying to process data from Kafka using Spark Structured Streaming. The code for ingesting the data is as follows:

val enriched = df.select($"value" cast "string" as "json") .select(from_json($"json", schema) as "data") .select("data.*") 

ds is a DataFrame with the data consumed from Kafka.

The problem comes when I try to read is as JSON in order to do faster queries. the function that comes from org.apache.spark.sql.functions from_json() is asking obligatory for a schema. What if the messages have some different fields?

general improvements
Link
Jacek Laskowski
  • 75k
  • 28
  • 253
  • 440

How to use from_json with schema inference, i.e. without specifying schema explicitlyto allow for messages to have different fields?

general improvements
Source Link
Jacek Laskowski
  • 75k
  • 28
  • 253
  • 440

Structured Spark Streaming How to use from_json inferwith schema inference, i.e. without specifying schema explicitly?

I am trying to process data from Kafka using Structured Spark streamingStructured Streaming. The code for ingesting the data is as follows:

 val enriched = df.selectExpr("cast select(value$"value" ascast string)"string" as json""json")   .select(from_json($"json", schema). as( "data"))   .select("data.*") // Expand the data into columns 

dsds is a dataframeDataFrame with the data consumed form kafkafrom Kafka. Up to there no problem, the

The problem comes when I try to read is as jsonJSON in order to faster do thefaster queries. the function that comes from org.apache.spark.sql.functions from_json()org.apache.spark.sql.functions from_json() is asking obligatory for a schema. What if the messages have some different fields?

Is there any workaround?

Cheers!

Structured Spark Streaming from_json infer schema

I am trying to process data from Kafka using Structured Spark streaming. The code for ingesting the data is as follows:

 val enriched = df.selectExpr("cast (value as string) as json")   .select(from_json($"json", schema).as("data"))   .select("data.*") // Expand the data into columns 

ds is a dataframe with the data consumed form kafka. Up to there no problem, the problem comes when I try to read is as json in order to faster do the queries. the function that comes from org.apache.spark.sql.functions from_json() is asking obligatory for a schema. What if the messages have some different fields?

Is there any workaround?

Cheers!

How to use from_json with schema inference, i.e. without specifying schema explicitly?

I am trying to process data from Kafka using Spark Structured Streaming. The code for ingesting the data is as follows:

val enriched = df.select($"value" cast "string" as "json") .select(from_json($"json", schema) as "data") .select("data.*") 

ds is a DataFrame with the data consumed from Kafka.

The problem comes when I try to read is as JSON in order to do faster queries. the function that comes from org.apache.spark.sql.functions from_json() is asking obligatory for a schema. What if the messages have some different fields?

Source Link
ppanero
  • 327
  • 6
  • 18
Loading