5

I have a large nested NDJ (new line delimited JSON) file that I need to read into a single spark dataframe and save to parquet. In an attempt to render the schema I use this function:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName)) } }) } 

on the dataframe that is returned by reading by

val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)

I've also switched this to val df = spark.read.json(path) so that this only works with NDJs and not multi-line JSON--same error.

This is causing an out of memory error on the workers java.lang.OutOfMemoryError: Java heap space.

I've altered the jvm memory options and spark executor/driver options to no avail

Is there a way to stream the file, flatten the schema, and add to a dataframe incrementally? Some lines of the JSON contain new fields from the preceding entires...so those would need to be filled in later.

2 Answers 2

2

No work around. The issue was with the JVM object limit. I ended up using a scala json parser and built the dataframe manually.

Sign up to request clarification or add additional context in comments.

Comments

0

You can achieve this in multiple ways.

First while reading, you can provide the schema for dataframe to read json or you can allow the spark to infer the schema by itself.

Once the json is in dataframe, you can follow the following ways to flatten it.

a. Using explode() on dataframe - to flatten it. b. Using spark sql and access the nested fields using . operator. You can find examples here

Lastly, if you want to add new columns to dataframe a. First option,using withColumn() is one approach. However this will be done for each new column added and for entire data set. b. Using sql to generate new dataframe from existing - this may be easiest c. Lastly, using map, then accessing elements, get old schema, add new values, create new schema and finally get the new df - as below

One withColumn will work on entire rdd. So generally its not a good practise to use the method for every column you want to add. There is a way where you work with columns and their data inside a map function. Since one map function is doing the job here, the code to add new column and its data will be done in parallel.

a. you can gather new values based on the calculations

b. Add these new column values to main rdd as below

val newColumns: Seq[Any] = Seq(newcol1,newcol2) Row.fromSeq(row.toSeq.init ++ newColumns) 

Here row, is the reference of row in map method

c. Create new schema as below

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

d. Add to the old schema

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e. Create new dataframe with new columns

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 

6 Comments

How does it address java.lang.OutOfMemoryError resulting from wholeTextFiles?
I was addressing "Is there a way to stream the file, flatten the schema, and add to a dataframe incrementally? Some lines of the JSON contain new fields from the preceding entires...so those would need to be filled in later.". I see no question regarding memory issue resolution. So gave him multiple approaches.
If NDJ is JSONL then OP shouldn't use wholeTextFiles. If it is not this won't help.
All you can do with textFile() can be done with wholeTextFile() but with bad performance . The reverse is not true - due to multi line scenarios. Since here its a NDJ as mentioned, the best possible alternative is come out of wholetext files and also alternatives as mentioned above. Let me know if I am missing anything
I was using wholeTextFile to account for multi-line JSON and switched to the normal val df = spark.read.json(path) Yet, this still gives me a java OOM. I've tried increasing the memory with -xmx options and I've also increased memory on driver/executor nodes. The json file is ~ 1G and I have 16gigs available to test on my laptop.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.