I am new to Spark and want to pivot a PySpark dataframe on multiple columns. There is a single row for each distinct (date, rank) combination. The rows should be flattened such that there is one row per unique date.
import pyspark.sql.functions as F from datetime import datetime data= [(datetime(2021,8,4,13,0), 1, 22, "a"),(datetime(2021,8,4,13,0), 2, 14, "a"),(datetime(2021,8,4,13,0), 3, 9, "a"),(datetime(2021,8,4,13,0), 4, 7, "a"), (datetime(2021,8,4,14,0), 1, 16, "b"),(datetime(2021,8,4,14,0), 2, 21, "b"),(datetime(2021,8,4,14,0), 3, 17, "b"),(datetime(2021,8,4,14,0), 4, 18, "b"), (datetime(2021,8,4,15,0), 1, 19, "a"),(datetime(2021,8,4,15,0), 2, 9, "b"),(datetime(2021,8,4,15,0), 3, 10, "c"),(datetime(2021,8,4,15,0), 4, 13, "d") ] columns= ["date","rank","feat1","feat2"] df = spark.createDataFrame(data = data, schema = columns) df.show(truncate=False) +-------------------+----+-----+-----+ |date |rank|feat1|feat2| +-------------------+----+-----+-----+ |2021-08-04 13:00:00|1 |22 |a | |2021-08-04 13:00:00|2 |14 |a | |2021-08-04 13:00:00|3 |9 |a | |2021-08-04 13:00:00|4 |7 |a | |2021-08-04 14:00:00|1 |16 |b | |2021-08-04 14:00:00|2 |21 |b | |2021-08-04 14:00:00|3 |17 |b | |2021-08-04 14:00:00|4 |18 |b | |2021-08-04 15:00:00|1 |19 |a | |2021-08-04 15:00:00|2 |9 |b | |2021-08-04 15:00:00|3 |10 |c | |2021-08-04 15:00:00|4 |13 |d | +-------------------+----+-----+-----+ The real data has 30+ feature columns, and the ranks go from 1 to 100 for each date. The desired output:
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ | date|rank1_feat1|rank2_feat1|rank3_feat1|rank4_feat1|rank1_feat2|rank2_feat2|rank3_feat2|rank4_feat2| +-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ |2021-08-04 15:00:00| 19| 9| 10| 13| a| b| c| d| |2021-08-04 13:00:00| 22| 14| 9| 7| a| a| a| a| |2021-08-04 14:00:00| 16| 21| 17| 18| b| b| b| b| +-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ I have a solution that appears to work for my trivial example, but the memory usage is so extreme that I can't use even 1/500th of my data without getting memory errors.
dfspine = df.select("date").distinct() for col in df.columns: if col not in ["date", "rank"]: piv = df.groupby("date").pivot("rank").agg(F.first(col)) mapping = dict([(pivcol,"rank%s_%s" % (pivcol, col)) for pivcol in piv.columns if pivcol not in ["date"]]) piv = piv.select([F.col(c).alias(mapping.get(c, c)) for c in piv.columns]) dfspine = dfspine.join(piv, how="left", on="date")