1

I am new to Spark and want to pivot a PySpark dataframe on multiple columns. There is a single row for each distinct (date, rank) combination. The rows should be flattened such that there is one row per unique date.

import pyspark.sql.functions as F from datetime import datetime data= [(datetime(2021,8,4,13,0), 1, 22, "a"),(datetime(2021,8,4,13,0), 2, 14, "a"),(datetime(2021,8,4,13,0), 3, 9, "a"),(datetime(2021,8,4,13,0), 4, 7, "a"), (datetime(2021,8,4,14,0), 1, 16, "b"),(datetime(2021,8,4,14,0), 2, 21, "b"),(datetime(2021,8,4,14,0), 3, 17, "b"),(datetime(2021,8,4,14,0), 4, 18, "b"), (datetime(2021,8,4,15,0), 1, 19, "a"),(datetime(2021,8,4,15,0), 2, 9, "b"),(datetime(2021,8,4,15,0), 3, 10, "c"),(datetime(2021,8,4,15,0), 4, 13, "d") ] columns= ["date","rank","feat1","feat2"] df = spark.createDataFrame(data = data, schema = columns) df.show(truncate=False) +-------------------+----+-----+-----+ |date |rank|feat1|feat2| +-------------------+----+-----+-----+ |2021-08-04 13:00:00|1 |22 |a | |2021-08-04 13:00:00|2 |14 |a | |2021-08-04 13:00:00|3 |9 |a | |2021-08-04 13:00:00|4 |7 |a | |2021-08-04 14:00:00|1 |16 |b | |2021-08-04 14:00:00|2 |21 |b | |2021-08-04 14:00:00|3 |17 |b | |2021-08-04 14:00:00|4 |18 |b | |2021-08-04 15:00:00|1 |19 |a | |2021-08-04 15:00:00|2 |9 |b | |2021-08-04 15:00:00|3 |10 |c | |2021-08-04 15:00:00|4 |13 |d | +-------------------+----+-----+-----+ 

The real data has 30+ feature columns, and the ranks go from 1 to 100 for each date. The desired output:

+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ | date|rank1_feat1|rank2_feat1|rank3_feat1|rank4_feat1|rank1_feat2|rank2_feat2|rank3_feat2|rank4_feat2| +-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ |2021-08-04 15:00:00| 19| 9| 10| 13| a| b| c| d| |2021-08-04 13:00:00| 22| 14| 9| 7| a| a| a| a| |2021-08-04 14:00:00| 16| 21| 17| 18| b| b| b| b| +-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ 

I have a solution that appears to work for my trivial example, but the memory usage is so extreme that I can't use even 1/500th of my data without getting memory errors.

dfspine = df.select("date").distinct() for col in df.columns: if col not in ["date", "rank"]: piv = df.groupby("date").pivot("rank").agg(F.first(col)) mapping = dict([(pivcol,"rank%s_%s" % (pivcol, col)) for pivcol in piv.columns if pivcol not in ["date"]]) piv = piv.select([F.col(c).alias(mapping.get(c, c)) for c in piv.columns]) dfspine = dfspine.join(piv, how="left", on="date") 
2
  • check this, stackoverflow.com/questions/45035940/… Commented Dec 17, 2021 at 6:14
  • Then you want to create 3000 columns, right? Why not just using the struct column? Commented Dec 17, 2021 at 8:08

1 Answer 1

0

In pandas this is fairly trivial. It's a melt and pivot. In pyspark it's a little more difficult due to the lack of melt. Fortunately this post has a solution for recreating melt in pyspark. We can base our approach off of this.

Pandas version:

import pandas as pd pdf = pd.DataFrame({'date': ['2021-08-04 13:00:00', '2021-08-04 13:00:00', '2021-08-04 13:00:00', '2021-08-04 13:00:00', '2021-08-04 14:00:00', '2021-08-04 14:00:00', '2021-08-04 14:00:00', '2021-08-04 14:00:00', '2021-08-04 15:00:00', '2021-08-04 15:00:00', '2021-08-04 15:00:00', '2021-08-04 15:00:00'], 'rank': [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4], 'feat1': [22, 14, 9, 7, 16, 21, 17, 18, 19, 9, 10, 13], 'feat2': ['a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'a', 'b', 'c', 'd']}) pdf = pdf.melt(id_vars=['date', 'rank']) pdf['col'] = 'rank' + pdf['rank'].astype(str) + '_' + pdf['variable'] pdf.pivot(index='date',columns='col',values='value') 

pyspark version:

import pandas as pd from pyspark.sql.functions import array, col, explode, lit, struct, concat, first from pyspark.sql import DataFrame from typing import Iterable def melt( df: DataFrame, id_vars: Iterable[str], value_vars: Iterable[str], var_name: str="variable", value_name: str="value") -> DataFrame: """Convert :class:`DataFrame` from wide to long format.""" # Create array<struct<variable: str, value: ...>> _vars_and_vals = array(*( struct(lit(c).alias(var_name), col(c).alias(value_name)) for c in value_vars)) # Add to the DataFrame and explode _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) cols = id_vars + [ col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]] return _tmp.select(*cols) pdf = pd.DataFrame({'date': ['2021-08-04 13:00:00', '2021-08-04 13:00:00', '2021-08-04 13:00:00', '2021-08-04 13:00:00', '2021-08-04 14:00:00', '2021-08-04 14:00:00', '2021-08-04 14:00:00', '2021-08-04 14:00:00', '2021-08-04 15:00:00', '2021-08-04 15:00:00', '2021-08-04 15:00:00', '2021-08-04 15:00:00'], 'rank': [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4], 'feat1': [22, 14, 9, 7, 16, 21, 17, 18, 19, 9, 10, 13], 'feat2': ['a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'a', 'b', 'c', 'd']}) sdf = spark.createDataFrame(pdf) sdf = melt(sdf, id_vars=['date', 'rank'], value_vars=['feat1','feat2']) sdf = sdf.withColumn('rank', concat(lit('rank'),col('rank'), lit('_'),col('variable'))) sdf = sdf.groupby('date').pivot('rank').agg(first(col('value'))) 

Output

+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ | date|rank1_feat1|rank1_feat2|rank2_feat1|rank2_feat2|rank3_feat1|rank3_feat2|rank4_feat1|rank4_feat2| +-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ |2021-08-04 13:00:00| 22| a| 14| a| 9| a| 7| a| |2021-08-04 14:00:00| 16| b| 21| b| 17| b| 18| b| |2021-08-04 15:00:00| 19| a| 9| b| 10| c| 13| d| +-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.