65

All,

Is there an elegant and accepted way to flatten a Spark SQL table (Parquet) with columns that are of nested StructType

For example

If my schema is:

foo |_bar |_baz x y z 

How do I select it into a flattened tabular form without resorting to manually running

df.select("foo.bar","foo.baz","x","y","z") 

In other words, how do I obtain the result of the above code programmatically given just a StructType and a DataFrame

5
  • Have you tried using the explode DataFrame method? Commented May 26, 2016 at 22:37
  • 2
    Don't think explode is going to do it. explode creates new rows -- he wants to add columns. I think you need to work with Column objects. Commented May 27, 2016 at 2:42
  • Sorry, my mistake. Commented May 27, 2016 at 13:55
  • I mean, I'm sure I could do it with explode -- explode actually does let you create new columns. I just don't think it would be very elegant -- you would probably have to do the schema reflection for every record, instead of front-loading the schema reflection to only do it once to create the select(...) Commented May 27, 2016 at 15:57
  • 1
    Solution directly from databricks: github.com/delta-io/delta/blob/… Commented Aug 14, 2019 at 8:25

14 Answers 14

109

The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...) statement by walking through the DataFrame.schema.

The recursive function should return an Array[Column]. Every time the function hits a StructType, it would call itself and append the returned Array[Column] to its own Array[Column].

Something like:

import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.functions.col def flattenSchema(schema: StructType): Seq[Column] = schema.fields.flatMap { case StructField(name, inner: StructType, _, _) => allColumns(inner).map(sub => col(s"$name.$sub")) case StructField(name, _, _, _) => Seq(col(name)) } 

You would then use it like this:

df.select(flattenSchema(df.schema):_*) 
Sign up to request clarification or add additional context in comments.

14 Comments

Thank you, this seems like a very reasonable solution.
Using this solution, how can I handle lowest level child nodes which have identical names? For example parent element Foo has child Bar and parent element Foz also has a separate child named Bar. When selecting Foo.Bar and Foz.Bar from initial dataframe (with the Array returned by flattenSchema), I get 2 columns both named Bar. But I would like column headers as Foo.Bar or Foo_Bar or something like that. So every one of them would be unique and unambiguous.
In what version of Spark is the above solution applicable? In Spark 2.1.0 (Java API) it doesn't look like StructField's type can ever be a StructType.
TheM00s3, you would import org.apache.spark.sql.functions.col And it should work in Spark 2.1.x as well (only tried this in Scala so far, not Java)
Just in case someone else stumbled on this: if you want the new column names to reflect the nested structure of the original schema: f1.nested1.nested2 ... you should alias the columns at this line: case _ => Array(col(colName)) should become case _ => Array(col(colName).alias(colName))
|
45

Just wanted to share my solution for Pyspark - it's more or less a translation of @David Griffin's solution, so it supports any level of nested objects.

from pyspark.sql.types import StructType, ArrayType def flatten(schema, prefix=None): fields = [] for field in schema.fields: name = prefix + '.' + field.name if prefix else field.name dtype = field.dataType if isinstance(dtype, ArrayType): dtype = dtype.elementType if isinstance(dtype, StructType): fields += flatten(dtype, prefix=name) else: fields.append(name) return fields df.select(flatten(df.schema)).show() 

3 Comments

I am running into an error, likely due to a heavily nested JSON schema, but I am not entirely sure what it means: "cannot resolve 'item.productOrService.coding['code']' due to data type mismatch: argument 2 requires integral type, however, ''code'' is of string type." Any ideas? I am entirely new to JSON but I would suspect the array within the struct is an issue.
@user1983682 Please, open your case, so we could see your schema with details.
To also avoid using the child names as column names, we can use a list comprehension of alias like so: deep_names = flatten(df.schema, ''); alias_list = [col(entry).alias(entry) for entry in deep_names]; df.select(*alias_list)
29

I am improving my previous answer and offering a solution to my own problem stated in the comments of the accepted answer.

This accepted solution creates an array of Column objects and uses it to select these columns. In Spark, if you have a nested DataFrame, you can select the child column like this: df.select("Parent.Child") and this returns a DataFrame with the values of the child column and is named Child. But if you have identical names for attributes of different parent structures, you lose the info about the parent and may end up with identical column names and cannot access them by name anymore as they are unambiguous.

This was my problem.

I found a solution to my problem, maybe it can help someone else as well. I called the flattenSchema separately:

val flattenedSchema = flattenSchema(df.schema) 

and this returned an Array of Column objects. Instead of using this in the select(), which would return a DataFrame with columns named by the child of the last level, I mapped the original column names to themselves as strings, then after selecting Parent.Child column, it renames it as Parent.Child instead of Child (I also replaced dots with underscores for my convenience):

val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_"))) 

And then you can use the select function as shown in the original answer:

var newDf = df.select(renamedCols:_*) 

10 Comments

Hi @V. Samma, This solution is great. However, what will the code be in case of child attributes with the same name and from a single parent attr. ? e.g. { "batter": [ { "id": "1001", "type": "Regular" }, { "id": "1002", "type": "Chocolate" }, { "id": "1003", "type": "Blueberry" }, { "id": "1004", "type": "Devil's Food" } ] }
@vsdaking Hi, thanks. I'm sure I have tackled a problem where you want data from a JSON array. Unfortunately, some time has passed and I don't have access to Spark currently to test as well. The child attributes with the same name is not a problem because they would need to be the column names for your final DF I presume. You just have to search for how to read JSON Arrays in Spark. Maybe the explode command will help you with that.
thanks for this @V.Samma I've used this for my problem, however it creates a very wide dataframe, I actually need my nested Struct Types as new Rows in my Dataframe. Any advise on this would be appreciated
@ukbaz Of course, it takes all nested child properties and flattens them schema-wise, which actually means that they are now a separate column for the dataframe. That was the goal of my solution. I am struggling to understand what do you need exactly. If you have columns ID, Person, Address but schema is like: "ID", "Person.Name", "Person.Age", "Address.City", "Address.Street", "Address.Country", then by flattening, the initial 3 columns create 6 columns. What's the result you would want based on my example?
@mythic Unfortunately, I haven't been working with Spark for a few years but it seems you already got help with that :)
|
5

I added a DataFrame#flattenSchema method to the open source spark-daria project.

Here's how you can use the function with your code.

import com.github.mrpowers.spark.daria.sql.DataFrameExt._ df.flattenSchema().show() +-------+-------+---------+----+---+ |foo.bar|foo.baz| x| y| z| +-------+-------+---------+----+---+ | this| is|something|cool| ;)| +-------+-------+---------+----+---+ 

You can also specify different column name delimiters with the flattenSchema() method.

df.flattenSchema(delimiter = "_").show() +-------+-------+---------+----+---+ |foo_bar|foo_baz| x| y| z| +-------+-------+---------+----+---+ | this| is|something|cool| ;)| +-------+-------+---------+----+---+ 

This delimiter parameter is surprisingly important. If you're flattening your schema to load the table in Redshift, you won't be able to use periods as the delimiter.

Here's the full code snippet to generate this output.

val data = Seq( Row(Row("this", "is"), "something", "cool", ";)") ) val schema = StructType( Seq( StructField( "foo", StructType( Seq( StructField("bar", StringType, true), StructField("baz", StringType, true) ) ), true ), StructField("x", StringType, true), StructField("y", StringType, true), StructField("z", StringType, true) ) ) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), StructType(schema) ) df.flattenSchema().show() 

The underlying code is similar to David Griffin's code (in case you don't want to add the spark-daria dependency to your project).

object StructTypeHelpers { def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = { schema.fields.flatMap(structField => { val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name structField.dataType match { case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName) case _ => Array(col(codeColName).alias(colName)) } }) } } object DataFrameExt { implicit class DataFrameMethods(df: DataFrame) { def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = { df.select( StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _* ) } } } 

1 Comment

Can we add support of Array<Struct> and for Array.
4

========== edit ====

There's some additional handling for more complex schemas here: https://medium.com/@lvhuyen/working-with-spark-dataframe-having-a-complex-schema-a3bce8c3f44

==================

PySpark, added to @Evan V's answer, when your field-names have special characters, like a dot '.', a hyphen '-', ...:

from pyspark.sql.types import StructType, ArrayType def normalise_field(raw): return raw.strip().lower() \ .replace('`', '') \ .replace('-', '_') \ .replace(' ', '_') \ .strip('_') def flatten(schema, prefix=None): fields = [] for field in schema.fields: name = "%s.`%s`" % (prefix, field.name) if prefix else "`%s`" % field.name dtype = field.dataType if isinstance(dtype, ArrayType): dtype = dtype.elementType if isinstance(dtype, StructType): fields += flatten(dtype, prefix=name) else: fields.append(col(name).alias(normalise_field(name))) return fields df.select(flatten(df.schema)).show() 

Comments

3

You could also use SQL to select columns as flat.

  1. Get original data-frame schema
  2. Generate SQL string, by browsing schema
  3. Query your original data-frame

I did an implementation in Java: https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

(use recursive method as well, I prefer SQL way, so you can test it easily via Spark-shell).

Comments

3

Here is a function that is doing what you want and that can deal with multiple nested columns containing columns with same name, with a prefix:

from pyspark.sql import functions as F def flatten_df(nested_df): flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'] nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'] flat_df = nested_df.select(flat_cols + [F.col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols for c in nested_df.select(nc+'.*').columns]) return flat_df 

Before:

root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true) |-- bar: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true) 

After:

root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo_a: float (nullable = true) |-- foo_b: float (nullable = true) |-- foo_c: integer (nullable = true) |-- bar_a: float (nullable = true) |-- bar_b: float (nullable = true) |-- bar_c: integer (nullable = true) 

Comments

3

To combine David Griffen and V. Samma answers, you could just do this to flatten while avoiding duplicate column names:

import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName).as(colName.replace(".","_"))) } }) } def flattenDataFrame(df:DataFrame): DataFrame = { df.select(flattenSchema(df.schema):_*) } var my_flattened_json_table = flattenDataFrame(my_json_table) 

Comments

3

This is a modification of the solution but it uses tailrec notation

 @tailrec def flattenSchema( splitter: String, fields: List[(StructField, String)], acc: Seq[Column]): Seq[Column] = { fields match { case (field, prefix) :: tail if field.dataType.isInstanceOf[StructType] => val newPrefix = s"$prefix${field.name}." val newFields = field.dataType.asInstanceOf[StructType].fields.map((_, newPrefix)).toList flattenSchema(splitter, tail ++ newFields, acc) case (field, prefix) :: tail => val colName = s"$prefix${field.name}" val newCol = col(colName).as(colName.replace(".", splitter)) flattenSchema(splitter, tail, acc :+ newCol) case _ => acc } } def flattenDataFrame(df: DataFrame): DataFrame = { val fields = df.schema.fields.map((_, "")) df.select(flattenSchema("__", fields.toList, Seq.empty): _*) } 

1 Comment

this tailrec feature of flatten dataframe works really good for struct type dataframe. Can you please add a case to handle array type with explode features. It wud be really helpful for me. Thanks in advance
2

A little addition to the code above, if you are working with Nested Struct and Array.

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) f match { case StructField(_, struct:StructType, _, _) => flattenSchema(struct, colName) case StructField(_, ArrayType(x :StructType, _), _, _) => flattenSchema(x, colName) case StructField(_, ArrayType(_, _), _, _) => Array(col(colName)) case _ => Array(col(colName)) } }) } 

3 Comments

I am attempting to implement this logic into the spark suggestion given by Evan V but cannot seem to get the code right for the Struct within Array type--I would appreciate the help if anyone has ideas.
Can we add a depth to scan while flattening the schema ?
I am trying to use it but it is not giving proper input. I have a a, a.b, a.b.c, a.b.d but it is not doing flattening for the last child level
1

I have been using one liners which result in a flattened schema with 5 columns of bar, baz, x, y, z:

df.select("foo.*", "x", "y", "z") 

As for explode: I typically reserve explode for flattening a list. For example if you have a column idList that is a list of Strings, you could do:

df.withColumn("flattenedId", functions.explode(col("idList"))) .drop("idList") 

That will result in a new Dataframe with a column named flattenedId (no longer a list)

Comments

1

This is based on @Evan V's solution to deal with more heavily nested Json files. For me the problem with original solution is When there is an ArrayType nested right in another ArrayType, I got an error.

for example if a Json looks like:

{"e":[{"f":[{"g":"h"}]}]} 

I will get an error:

"cannot resolve '`e`.`f`['g']' due to data type mismatch: argument 2 requires integral type 

To solve this I modified the code a bit, I agree this looks super stupid bust just posting it here so that someone may come up with a nicer solution.

def flatten(schema, prefix=None): fields = [] for field in schema.fields: name = prefix + '.' + field.name if prefix else field.name dtype = field.dataType if isinstance(dtype, T.StructType): fields += flatten(dtype, prefix=name) else: fields.append(name) return fields def explodeDF(df): for (name, dtype) in df.dtypes: if "array" in dtype: df = df.withColumn(name, F.explode(name)) return df def df_is_flat(df): for (_, dtype) in df.dtypes: if ("array" in dtype) or ("struct" in dtype): return False return True def flatJson(jdf): keepGoing = True while(keepGoing): fields = flatten(jdf.schema) new_fields = [item.replace(".", "_") for item in fields] jdf = jdf.select(fields).toDF(*new_fields) jdf = explodeDF(jdf) if df_is_flat(jdf): keepGoing = False return jdf 

Usage:

df = spark.read.json(path_to_json) flat_df = flatJson(df) flat_df.show() +---+---+-----+ | a|e_c|e_f_g| +---+---+-----+ | b| d| h| +---+---+-----+ 

1 Comment

Use explode_outer to get nulls for null arrays
0
import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf import org.apache.spark.sql.types.StructType import scala.collection.mutable.ListBuffer val columns=new ListBuffer[String]() def flattenSchema(schema:StructType,prefix:String=null){ for(i<-schema.fields){ if(i.dataType.isInstanceOf[StructType]) { val columnPrefix = i.name + "." flattenSchema(i.dataType.asInstanceOf[StructType], columnPrefix) } else { if(prefix == null) columns.+=(i.name) else columns.+=(prefix+i.name) } } } 

Comments

0

Combining Evan V's, Avrell and Steco ideas. I am also providing a complete SQL syntax while handling query fields with special characters using '`' in PySpark.

The solution below gives the following,

  1. Handle Nested JSON Schema.
  2. Handle same column names across nested columns (We will give alias name of the entire hierarchy separated by underscores).
  3. Handle Special Characters. (we handle special characters with '', I have not handled consecutive occurences of '' but we can do that as well with appropriate 'sub' replacements)
  4. Gives us SQL syntax.
  5. Query Fields are enclosed within '`'.

Code snippet is below,

df=spark.read.json('<JSON FOLDER / FILE PATH>') df.printSchema() from pyspark.sql.types import StructType, ArrayType def flatten(schema, prefix=None): fields = [] for field in schema.fields: name = prefix + '.' + field.name if prefix else field.name dtype = field.dataType if isinstance(dtype, ArrayType): dtype = dtype.elementType if isinstance(dtype, StructType): fields += flatten(dtype, prefix=name) else: alias_name=name.replace('.','_').replace(' ','_').replace('(','').replace(')','').replace('-','_').replace('&','_').replace(r'(_){2,}',r'\1') name=name.replace('.','`.`') field_name = "`" + name + "`" + " AS " + alias_name fields.append(field_name) return fields df.createOrReplaceTempView("to_flatten_df") query_fields=flatten(df.schema) def listToString(s): # initialize an empty string str1 = "" # traverse in the string for ele in s: str1 = str1 + ele + ',' # return string return str1 spark.sql("SELECT " + listToString(query_fields)[:-1] + " FROM to_flatten_df" ).show() 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.