I'd like to create a pyspark dataframe from a json file in hdfs.
the json file has the following contet:
{ "Product": { "0": "Desktop Computer", "1": "Tablet", "2": "iPhone", "3": "Laptop" }, "Price": { "0": 700, "1": 250, "2": 800, "3": 1200 } }
Then, I read this file using pyspark 2.4.4 df = spark.read.json("/path/file.json")
So, I get a result like this:
df.show(truncate=False) +---------------------+---------------------------------+ |Price |Product | +---------------------+---------------------------------+ |[700, 250, 800, 1200]|[Desktop, Tablet, Iphone, Laptop]| +---------------------+---------------------------------+ But I'd like a dataframe with the following structure:
+-------+--------+ |Price |Product | +-------+--------+ |700 |Desktop | |250 |Tablet | |800 |Iphone | |1200 |Laptop | +-------+--------+ How can I get a dataframe with the prevvious structure using pyspark?
I tried to use explode df.select(explode("Price")) but I got the following error:
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: /usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: Py4JJavaError: An error occurred while calling o688.select. : org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`Price`)' due to data type mismatch: input to function explode should be array or map type, not struct<0:bigint,1:bigint,2:bigint,3:bigint>;; 'Project [explode(Price#107) AS List()] +- LogicalRDD [Price#107, Product#108], false at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:97) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:89) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3301) at org.apache.spark.sql.Dataset.select(Dataset.scala:1312) at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) During handling of the above exception, another exception occurred: AnalysisException Traceback (most recent call last) <ipython-input-46-463397adf153> in <module> ----> 1 df.select(explode("Price")) /usr/lib/spark/python/pyspark/sql/dataframe.py in select(self, *cols) 1200 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)] 1201 """ -> 1202 jdf = self._jdf.select(self._jcols(*cols)) 1203 return DataFrame(jdf, self.sql_ctx) 1204 /usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 67 e.java_exception.getStackTrace())) 68 if s.startswith('org.apache.spark.sql.AnalysisException: '): ---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace) 70 if s.startswith('org.apache.spark.sql.catalyst.analysis'): 71 raise AnalysisException(s.split(': ', 1)[1], stackTrace) AnalysisException: "cannot resolve 'explode(`Price`)' due to data type mismatch: input to function explode should be array or map type, not struct<0:bigint,1:bigint,2:bigint,3:bigint>;;\n'Project [explode(Price#107) AS List()]\n+- LogicalRDD [Price#107, Product#108], false\n"