I added a DataFrame#flattenSchema method to the open source spark-daria project.
Here's how you can use the function with your code.
import com.github.mrpowers.spark.daria.sql.DataFrameExt._ df.flattenSchema().show() +-------+-------+---------+----+---+ |foo.bar|foo.baz| x| y| z| +-------+-------+---------+----+---+ | this| is|something|cool| ;)| +-------+-------+---------+----+---+
You can also specify different column name delimiters with the flattenSchema() method.
df.flattenSchema(delimiter = "_").show() +-------+-------+---------+----+---+ |foo_bar|foo_baz| x| y| z| +-------+-------+---------+----+---+ | this| is|something|cool| ;)| +-------+-------+---------+----+---+
This delimiter parameter is surprisingly important. If you're flattening your schema to load the table in Redshift, you won't be able to use periods as the delimiter.
Here's the full code snippet to generate this output.
val data = Seq( Row(Row("this", "is"), "something", "cool", ";)") ) val schema = StructType( Seq( StructField( "foo", StructType( Seq( StructField("bar", StringType, true), StructField("baz", StringType, true) ) ), true ), StructField("x", StringType, true), StructField("y", StringType, true), StructField("z", StringType, true) ) ) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), StructType(schema) ) df.flattenSchema().show()
The underlying code is similar to David Griffin's code (in case you don't want to add the spark-daria dependency to your project).
object StructTypeHelpers { def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = { schema.fields.flatMap(structField => { val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name structField.dataType match { case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName) case _ => Array(col(codeColName).alias(colName)) } }) } } object DataFrameExt { implicit class DataFrameMethods(df: DataFrame) { def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = { df.select( StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _* ) } } }
explodeDataFrame method?explodeis going to do it.explodecreates new rows -- he wants to add columns. I think you need to work withColumnobjects.explode--explodeactually does let you create new columns. I just don't think it would be very elegant -- you would probably have to do the schema reflection for every record, instead of front-loading the schema reflection to only do it once to create theselect(...)