34

I have a problem statement at hand wherein I want to unpivot table in Spark SQL / PySpark. I have gone through the documentation and I could see there is support only for pivot, but no support for un-pivot so far. Is there a way I can achieve this?

Let my initial table look like this:

Let my initial table look like this

When I pivot this in PySpark:

df.groupBy("A").pivot("B").sum("C") 

I get this as the output:

After pivot table looks like this

Now I want to unpivot the pivoted table. In general, this operation may/may not yield the original table based on how I've pivoted the original table.

Spark SQL as of now doesn't provide out of the box support for unpivot. Is there a way I can achieve this?

2

2 Answers 2

63

You can use the built in stack function, for example in Scala:

scala> val df = Seq(("G",Some(4),2,None),("H",None,4,Some(5))).toDF("A","X","Y", "Z") df: org.apache.spark.sql.DataFrame = [A: string, X: int ... 2 more fields] scala> df.show +---+----+---+----+ | A| X| Y| Z| +---+----+---+----+ | G| 4| 2|null| | H|null| 4| 5| +---+----+---+----+ scala> df.select($"A", expr("stack(3, 'X', X, 'Y', Y, 'Z', Z) as (B, C)")).where("C is not null").show +---+---+---+ | A| B| C| +---+---+---+ | G| X| 4| | G| Y| 2| | H| Y| 4| | H| Z| 5| +---+---+---+ 

Or in pyspark:

In [1]: df = spark.createDataFrame([("G",4,2,None),("H",None,4,5)],list("AXYZ")) In [2]: df.show() +---+----+---+----+ | A| X| Y| Z| +---+----+---+----+ | G| 4| 2|null| | H|null| 4| 5| +---+----+---+----+ In [3]: df.selectExpr("A", "stack(3, 'X', X, 'Y', Y, 'Z', Z) as (B, C)").where("C is not null").show() +---+---+---+ | A| B| C| +---+---+---+ | G| X| 4| | G| Y| 2| | H| Y| 4| | H| Z| 5| +---+---+---+ 
Sign up to request clarification or add additional context in comments.

8 Comments

I tried using the pyspark code given here, but its performance seems to be bad. Using a union all query to achieve pivot down gave me a better performance compared to this code. Are there any tweaks that we can do here to improve the performance ?
awesome. could you please send a link to stack() documentation? can't find it.
It's the same usage as in Hive: cwiki.apache.org/confluence/display/Hive/…
are you sure this is the result?writing 'as (B,C)' does not work since he expect 1 column to be the output. my result is a single column with all values i.e C column. B column is missing. what am I doing wrong?
This is interesting - I have been wondering about a function to unpivot dataframes and it's the first time I see something clean. For the people interested by benchmarking this, don't forget the solution consisting in explode(array({struct(<col name>,<col val>)}*)).
|
7

Spark 3.4+

df = df.melt(['A'], ['X', 'Y', 'Z'], 'B', 'C') # OR df = df.unpivot(['A'], ['X', 'Y', 'Z'], 'B', 'C') 
+---+---+----+ | A| B| C| +---+---+----+ | G| Y| 2| | G| Z|null| | G| X| 4| | H| Y| 4| | H| Z| 5| | H| X|null| +---+---+----+ 

To filter out nulls: df = df.filter("C is not null")


Spark 3.3 and below

to_melt = {'X', 'Y', 'Z'} new_names = ['B', 'C'] melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt]) df = df.select( *(set(df.columns) - to_melt), F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})") ).filter(f"!{new_names[1]} is null") 

Full test:

from pyspark.sql import functions as F df = spark.createDataFrame([("G", 4, 2, None), ("H", None, 4, 5)], list("AXYZ")) to_melt = {'X', 'Y', 'Z'} new_names = ['B', 'C'] melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt]) df = df.select( *(set(df.columns) - to_melt), F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})") ).filter(f"!{new_names[1]} is null") df.show() # +---+---+---+ # | A| B| C| # +---+---+---+ # | G| Y| 2| # | G| X| 4| # | H| Y| 4| # | H| Z| 5| # +---+---+---+ 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.