6

I'm using the approach given here to flatten a DataFrame in Spark SQL. Here is my code:

package com.acme.etl.xml import org.apache.spark.sql.types._ import org.apache.spark.sql.{Column, SparkSession} object RuntimeError { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("FlattenSchema").getOrCreate() val rowTag = "idocData" val dataFrameReader = spark.read .option("rowTag", rowTag) val xmlUri = "bad_011_1.xml" val df = dataFrameReader .format("xml") .load(xmlUri) val schema: StructType = df.schema val columns: Array[Column] = flattenSchema(schema) val df2 = df.select(columns: _*) } def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName: String = if (prefix == null) f.name else prefix + "." + f.name val dataType = f.dataType dataType match { case st: StructType => flattenSchema(st, colName) case _: StringType => Array(new org.apache.spark.sql.Column(colName)) case _: LongType => Array(new org.apache.spark.sql.Column(colName)) case _: DoubleType => Array(new org.apache.spark.sql.Column(colName)) case arrayType: ArrayType => arrayType.elementType match { case structType: StructType => flattenSchema(structType, colName) } case _ => Array(new org.apache.spark.sql.Column(colName)) } }) } } 

Much of the time, this works fine. But for the XML given below:

<Receive xmlns="http://Microsoft.LobServices.Sap/2007/03/Idoc/3/ORDERS05/ZORDERS5/702/Receive"> <idocData> <E2EDP01008GRP xmlns="http://Microsoft.LobServices.Sap/2007/03/Types/Idoc/3/ORDERS05/ZORDERS5/702"> <E2EDPT1001GRP> <E2EDPT2001> <DATAHEADERCOLUMN_DOCNUM>0000000141036013</DATAHEADERCOLUMN_DOCNUM> </E2EDPT2001> <E2EDPT2001> <DATAHEADERCOLUMN_DOCNUM>0000000141036013</DATAHEADERCOLUMN_DOCNUM> </E2EDPT2001> </E2EDPT1001GRP> </E2EDP01008GRP> <E2EDP01008GRP xmlns="http://Microsoft.LobServices.Sap/2007/03/Types/Idoc/3/ORDERS05/ZORDERS5/702"> </E2EDP01008GRP> </idocData> </Receive> 

this exception occurs:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`E2EDP01008GRP`.`E2EDPT1001GRP`.`E2EDPT2001`['DATAHEADERCOLUMN_DOCNUM']' due to data type mismatch: argument 2 requires integral type, however, ''DATAHEADERCOLUMN_DOCNUM'' is of string type.;; 'Project [E2EDP01008GRP#0.E2EDPT1001GRP.E2EDPT2001[DATAHEADERCOLUMN_DOCNUM] AS DATAHEADERCOLUMN_DOCNUM#3, E2EDP01008GRP#0._VALUE AS _VALUE#4, E2EDP01008GRP#0._xmlns AS _xmlns#5] +- Relation[E2EDP01008GRP#0] XmlRelation(<function0>,Some(/Users/paulreiners/s3/cdi-events-partition-staging/content_acme_purchase_order_json_v1/bad_011_1.xml),Map(rowtag -> idocData, path -> /Users/paulreiners/s3/cdi-events-partition-staging/content_acme_purchase_order_json_v1/bad_011_1.xml),null) 

What is causing this?

2
  • Can you please provide the exact code that you are using. Also one sample XML file for which the program works. Also why it is failing is clearly mentioned in the exception message. Commented Apr 20, 2019 at 8:04
  • I added the exact code. The full content of the sample XML file that it fails on was in the original posting and remains there. If the exception message were clear to me, I wouldn't be asking the question. Commented Apr 22, 2019 at 14:26

1 Answer 1

12
+500

Your document contains a multi-valued array so you can't flatten it completely in one pass since you can't give both elements of the array the same column name. Also, it's usually a bad idea to use a dot within a column name since it can easily confuse the Spark parser and will need to be escaped at all time.

The usual way to flatten such a dataset is to create new rows for each element of the array. You can use the explode function to do this but you will need to recursively call your flatten operation because explode can't be nested.

The following code works as expected, using '_' instead of '.' as column name separator:

import org.apache.spark.sql.types._ import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.{Dataset, Row} object RuntimeError { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("FlattenSchema").getOrCreate() val rowTag = "idocData" val dataFrameReader = spark.read.option("rowTag", rowTag) val xmlUri = "bad_011_1.xml" val df = dataFrameReader.format("xml").load(xmlUri) val df2 = flatten(df) } def flatten(df: Dataset[Row], prefixSeparator: String = "_") : Dataset[Row] = { import org.apache.spark.sql.functions.{col,explode} def mustFlatten(sc: StructType): Boolean = sc.fields.exists(f => f.dataType.isInstanceOf[ArrayType] || f.dataType.isInstanceOf[StructType]) def flattenAndExplodeOne(sc: StructType, parent: Column = null, prefix: String = null, cols: Array[(DataType,Column)] = Array[(DataType,Column)]()): Array[(DataType,Column)] = { val res = sc.fields.foldLeft(cols)( (columns, f) => { val my_col = if (parent == null) col(f.name) else parent.getItem(f.name) val flat_name = if (prefix == null) f.name else s"${prefix}${prefixSeparator}${f.name}" f.dataType match { case st: StructType => flattenAndExplodeOne(st, my_col, flat_name, columns) case dt: ArrayType => { if (columns.exists(_._1.isInstanceOf[ArrayType])) { columns :+ ((dt, my_col.as(flat_name))) } else { columns :+ ((dt, explode(my_col).as(flat_name))) } } case dt => columns :+ ((dt, my_col.as(flat_name))) } }) res } var flatDf = df while (mustFlatten(flatDf.schema)) { val newColumns = flattenAndExplodeOne(flatDf.schema, null, null).map(_._2) flatDf = flatDf.select(newColumns:_*) } flatDf } } 

The resulting df2 has the following schema and data:

df2.printSchema root |-- E2EDP01008GRP_E2EDPT1001GRP_E2EDPT2001_DATAHEADERCOLUMN_DOCNUM: long (nullable = true) |-- E2EDP01008GRP__xmlns: string (nullable = true) df2.show(true) +--------------------------------------------------------------+--------------------+ |E2EDP01008GRP_E2EDPT1001GRP_E2EDPT2001_DATAHEADERCOLUMN_DOCNUM|E2EDP01008GRP__xmlns| +--------------------------------------------------------------+--------------------+ | 141036013|http://Microsoft....| | 141036013|http://Microsoft....| +--------------------------------------------------------------+--------------------+ 
Sign up to request clarification or add additional context in comments.

4 Comments

This did solve my original problem. Thanks! However running it on a more complex case, I get this error: "Only one generator allowed per select clause but found 7: explode(E2EDK02), explode(E2EDK03), explode(E2EDK04001), explode(E2EDK14), explode(E2EDKA1003GRP), explode(E2EDP01008GRP), explode(E2EDS01);" If you know a quick fix for this, please let me know. Otherwise, I'll open a new question.
Right now the flattenSchema allows any number of explode at the same level but spark only allows 1 explode per select, so flattenSchema must short circuit the transformation after the first explode is expanded and defer the others for the next iterations. I can try and update tomorrow the answer for the most generic case but it will probably complicate the code quite significantly
I've updated the code with an updated version that should avoid multiple explode in the same step.
How do we deal with arraytype with null data, explode function will return all empty rows, I know we can use explode_outer. In case if I am using spark 2.1 we don't have explode_outer in it . Thanks

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.