8

I'd like to create a pyspark dataframe from a json file in hdfs.

the json file has the following contet:

{ "Product": { "0": "Desktop Computer", "1": "Tablet", "2": "iPhone", "3": "Laptop" }, "Price": { "0": 700, "1": 250, "2": 800, "3": 1200 } }

Then, I read this file using pyspark 2.4.4 df = spark.read.json("/path/file.json")

So, I get a result like this:

df.show(truncate=False) +---------------------+---------------------------------+ |Price |Product | +---------------------+---------------------------------+ |[700, 250, 800, 1200]|[Desktop, Tablet, Iphone, Laptop]| +---------------------+---------------------------------+ 

But I'd like a dataframe with the following structure:

+-------+--------+ |Price |Product | +-------+--------+ |700 |Desktop | |250 |Tablet | |800 |Iphone | |1200 |Laptop | +-------+--------+ 

How can I get a dataframe with the prevvious structure using pyspark?

I tried to use explode df.select(explode("Price")) but I got the following error:

--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: /usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: Py4JJavaError: An error occurred while calling o688.select. : org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`Price`)' due to data type mismatch: input to function explode should be array or map type, not struct<0:bigint,1:bigint,2:bigint,3:bigint>;; 'Project [explode(Price#107) AS List()] +- LogicalRDD [Price#107, Product#108], false at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:97) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:89) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3301) at org.apache.spark.sql.Dataset.select(Dataset.scala:1312) at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) During handling of the above exception, another exception occurred: AnalysisException Traceback (most recent call last) <ipython-input-46-463397adf153> in <module> ----> 1 df.select(explode("Price")) /usr/lib/spark/python/pyspark/sql/dataframe.py in select(self, *cols) 1200 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)] 1201 """ -> 1202 jdf = self._jdf.select(self._jcols(*cols)) 1203 return DataFrame(jdf, self.sql_ctx) 1204 /usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 67 e.java_exception.getStackTrace())) 68 if s.startswith('org.apache.spark.sql.AnalysisException: '): ---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): 71 raise AnalysisException(s.split(': ', 1)[1], stackTrace) AnalysisException: "cannot resolve 'explode(`Price`)' due to data type mismatch: input to function explode should be array or map type, not struct<0:bigint,1:bigint,2:bigint,3:bigint>;;\n'Project [explode(Price#107) AS List()]\n+- LogicalRDD [Price#107, Product#108], false\n" 
1

3 Answers 3

16

Recreating your DataFrame:

from pyspark.sql import functions as F df = spark.read.json("./row.json") df.printSchema() #root # |-- Price: struct (nullable = true) # | |-- 0: long (nullable = true) # | |-- 1: long (nullable = true) # | |-- 2: long (nullable = true) # | |-- 3: long (nullable = true) # |-- Product: struct (nullable = true) # | |-- 0: string (nullable = true) # | |-- 1: string (nullable = true) # | |-- 2: string (nullable = true) # | |-- 3: string (nullable = true) 

As shown above in the printSchema output, your Price and Product columns are structs. Thus explode will not work since it requires an ArrayType or MapType.

First, convert the structs to arrays using the .* notation as shown in Querying Spark SQL DataFrame with complex types:

df = df.select( F.array(F.expr("Price.*")).alias("Price"), F.array(F.expr("Product.*")).alias("Product") ) df.printSchema() #root # |-- Price: array (nullable = false) # | |-- element: long (containsNull = true) # |-- Product: array (nullable = false) # | |-- element: string (containsNull = true) 

Now since you're using Spark 2.4+, you can use arrays_zip to zip the Price and Product arrays together, before using explode:

df.withColumn("price_product", F.explode(F.arrays_zip("Price", "Product")))\ .select("price_product.Price", "price_product.Product")\ .show() #+-----+----------------+ #|Price| Product| #+-----+----------------+ #| 700|Desktop Computer| #| 250| Tablet| #| 800| iPhone| #| 1200| Laptop| #+-----+----------------+ 

For older versions of Spark, before arrays_zip, you can explode each column separately and join the results back together:

df1 = df\ .withColumn("price_map", F.explode("Price"))\ .withColumn("id", F.monotonically_increasing_id())\ .drop("Price", "Product") df2 = df\ .withColumn("product_map", F.explode("Product"))\ .withColumn("id", F.monotonically_increasing_id())\ .drop("Price", "Product") df3 = df1.join(df2, "id", "outer").drop("id") df3.show() #+---------+----------------+ #|price_map| product_map| #+---------+----------------+ #| 700|Desktop Computer| #| 250| Tablet| #| 1200| Laptop| #| 800| iPhone| #+---------+----------------+ 
Sign up to request clarification or add additional context in comments.

5 Comments

I wouldn't recommend using concat_ws + split - you're assuming the separator doesn't exist in the data. In any case, you can directly unpack the elements of the struct into an array, without enumerating each of them.
true ... that is where i was getting stuck ... how do you convert a nested struct to an array?
Using .*: Querying Spark SQL DataFrame with complex types. For example: df = df.select(array(expr("Price.*")).alias("Price"), array(f.expr("Product.*")).alias("Product")). I think this followed by the arrays_zip is the way to go.
thanks! i agree ... very clean code ... it was the array function i was missing
the ids by monotonically_increasing_id() in df1 and df2 are problematic, they can easily out of sync on a large dataset. I suggest you create this id with df = df.withColumn('id', F.monotonically_increasing_id()) before creating df1 and df2.
3

For Spark version without array_zip, we can also do this:

  1. First read the json file into a DataFrame
 from pyspark.sql import functions as F df=spark.read.json("your_json_file.json") df.show(truncate=False) +---------------------+------------------------------------------+ |Price |Product | +---------------------+------------------------------------------+ |[700, 250, 800, 1200]|[Desktop Computer, Tablet, iPhone, Laptop]| +---------------------+------------------------------------------+ 

Next, expand the struct into array:

df = df.withColumn('prc_array', F.array(F.expr('Price.*'))) df = df.withColumn('prod_array', F.array(F.expr('Product.*'))) 

Then create a map between the two arrays

df = df.withColumn('prc_prod_map', F.map_from_arrays('prc_array', 'prod_array')) df.select('prc_array', 'prod_array', 'prc_prod_map').show(truncate=False) +---------------------+------------------------------------------+-----------------------------------------------------------------------+ |prc_array |prod_array |prc_prod_map | +---------------------+------------------------------------------+-----------------------------------------------------------------------+ |[700, 250, 800, 1200]|[Desktop Computer, Tablet, iPhone, Laptop]|[700 -> Desktop Computer, 250 -> Tablet, 800 -> iPhone, 1200 -> Laptop]| +---------------------+------------------------------------------+-----------------------------------------------------------------------+ 

Finally, apply explode on the map:

df = df.select(F.explode('prc_prod_map').alias('prc', 'prod')) df.show(truncate=False) +----+----------------+ |prc |prod | +----+----------------+ |700 |Desktop Computer| |250 |Tablet | |800 |iPhone | |1200|Laptop | +----+----------------+ 

This way, we avoid the potentially time consuming join operation on two tables.

2 Comments

This is a nice answer but it won't work if the keys are not unique or if any of the keys are null
Duplicate keys don't have any problem on mapping, null keys might be an issue here. May have to fill the missing values first.
1

In case you are using < 2.4.4 Then following gives answers. However, for the strange schema of Json, I could not make it generic In real life example, please create a better formed json

PYSPARK VERSION

>>> from pyspark.sql import Row >>> json_df = spark.read.json("file.json") # File in current directory >>> json_df.show(20,False) # We only have 1 Row with two StructType columns +---------------------+------------------------------------------+ |Price |Product | +---------------------+------------------------------------------+ |[700, 250, 800, 1200]|[Desktop Computer, Tablet, iPhone, Laptop]| +---------------------+------------------------------------------+ >>> # We convert dataframe to Row and Zip two nested Rows Assuming there #will be no gap in values >>> spark.createDataFrame(zip(json_df.first().__getitem__(0), json_df.first().__getitem__(1)), schema=["Price", "Product"]).show(20,False) +-----+----------------+ |Price|Product | +-----+----------------+ |700 |Desktop Computer| |250 |Tablet | |800 |iPhone | |1200 |Laptop | +-----+----------------+ 

SCALA Version( without preferred Case Class Method)

 scala> val sparkDf = spark.read.json("file.json") sparkDf: org.apache.spark.sql.DataFrame = [Price: struct<0: bigint, 1: bigint ... 2 more fields>, Product: struct<0: string, 1: string ... 2 more fields>] scala> sparkDf.show(false) +---------------------+------------------------------------------+ |Price |Product | +---------------------+------------------------------------------+ |[700, 250, 800, 1200]|[Desktop Computer, Tablet, iPhone, Laptop]| +---------------------+------------------------------------------+ scala> import spark.implicits._ import spark.implicits._ scala> (sparkDf.first.getStruct(0).toSeq.asInstanceOf[Seq[Long]], sparkDf.first.getStruct(1).toSeq.asInstanceOf[Seq[String]]).zipped.toList.toDF("Price","Product") res6: org.apache.spark.sql.DataFrame = [Price: bigint, Product: string] scala> // We do same thing but able to use methods of Row use Spark Implicits to get DataSet Directly scala> (sparkDf.first.getStruct(0).toSeq.asInstanceOf[Seq[Long]], sparkDf.first.getStruct(1).toSeq.asInstanceOf[Seq[String]]).zipped.toList.toDF("Price","Product").show(false) +-----+----------------+ |Price|Product | +-----+----------------+ |700 |Desktop Computer| |250 |Tablet | |800 |iPhone | |1200 |Laptop | +-----+----------------+ 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.