3

Assume I have a partition which looks like this

part1: {"customerId":"1","name":"a"} {"customerId":"2","name":"b"} 

Assume I would like to change the Schema of this to Something like

{"data":"customers":[{"customerId":"1","name":"a"},{"customerId":"2","name":"b"}]} 

what I tried doing was

case class Customer(customerId:Option[String],name:Option[String]) case class Customers(customers:Option[Seq[Customer]]) case class Datum(data:Option[Customers]) 

I tried reading the partition as Json and converting to Dataframe.

val inputJson = spark.read.format("json").load("part1") inputJson.as[Datum] 

Somehow the Dataframe doesnt seem to implicitly infer the schema.

3
  • 1
    do you want to put all the data in a single row? Commented Mar 12, 2019 at 15:10
  • Yes. I do need them in a single row Commented Mar 12, 2019 at 15:37
  • 2
    By partition, you mean a single file? Because if you only have one file that can fit in a single node, you can skip spark and go for a more custom program, using circe or other json parsing library to make the transformation. If you still go for spark, your code will look something like: inputJson.as[Customer].mapPartitions(partition => { List(Datum(Some(Customers(Some(partition.toList)))))) }) This should do what you need Commented Mar 12, 2019 at 16:32

2 Answers 2

2

By having this structure I believe you are hiding/wrapping the really useful information of your data. The only useful information here is: {"customerId":"1","name":"a"},{"customerId":"2","name":"b"} customers along with datum will just hide the data that you really need. In order to access the data right now you must 1st slightly change your data to:

{"customers":[{"customerId":"1","name":"a"},{"customerId":"2","name":"b"}]} 

And then access this JSON with the next code:

case class Customer(customerId:String, name:String) case class Data(customers: Array[Customer]) val df = spark.read.json(path).as[Data] 

If try to print this dataframe you get:

+----------------+ | customers| +----------------+ |[[1, a], [2, b]]| +----------------+ 

which of course is your data wrapped into arrays. Now comes the interesting part, in order to access this you must do something as the following:

df.foreach{ data => data.customers.foreach(println _) } 

This will print:

Customer(1,a) Customer(2,b) 

which is the real data that you need but not easily accessed at all.

EDIT:

Instead of using 2 classes I would use just one, the Customer class. Then leverage the build-in Spark filters for selecting inner JSON objects. Finally you can explode each array of customers and generate from the exploded column a strongly type dataset of class Customer.

Here is the final code:

case class Customer(customerId:String, name:String) val path = "C:\\temp\\json_data.json" val df = spark.read.json(path) df.select(explode($"data.customers")) .map{ r => Customer(r.getStruct(0).getString(0), r.getStruct(0).getString(1))} .show(false) 

And the output:

+----------+----+ |customerId|name| +----------+----+ |1 |a | |2 |b | +----------+----+ 
Sign up to request clarification or add additional context in comments.

4 Comments

This seems more like a workaround to be fair. The situation I have is something like this. I have a Spark job which dumps the json in the format and is consumed by many other downstream jobs.
Hello @Rakshith as fas as I know the 1st approach is the way to read your JSON data in this form. The disadvantage is you must create a dummy/container class (you called it Data in this case) to hold the data that you really need. In this case you are forced to access your data through something like data.customers on the other side there is a solution. You can 1st explode the content of customers, in order to retrieve all the records of customers. Then flatMap() to generate the final dataset containing the columns [customerId, name] as shown in the last output.
Seems the thing that i was looking for. Accepted the answer
Hello @Rakshith sorry for not answering before. I was kind of busy :) I have update my answer with a more elegant way for retrieving your JSON into the Customer object as discussed above
0

I ended up manipulating the dataframe itself

val inputJson = spark.read.format("json").load("part1") val formatted = inputJson.withColumn("dummy",lit(1)).groupBy("dummy") .agg(collect_list(struct(dataFrame.col("*"))).alias("customers")) val finalFormatted=formatted.withColumn("data",struct(col("customers"))) .select("data") 

Now when i do

finalFormatted.printSchema 

I get the schema that i need

 |-- data: struct (nullable = false) | |-- customers: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- customerId: string (nullable = true) | | | |-- name: string (nullable = true) 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.