0

I'm having a hard time framing the following Pyspark dataframe manipulation.

Essentially I am trying to group by category and then pivot/unmelt the subcategories and add new columns.

enter image description here

I've tried a number of ways, but they are very slow and and are not leveraging Spark's parallelism.

Here is my existing (slow, verbose) code:

from pyspark.sql.functions import lit df = sqlContext.table('Table') #loop over category listids = [x.asDict().values()[0] for x in df.select("category").distinct().collect()] dfArray = [df.where(df.category == x) for x in listids] for d in dfArray: #loop over subcategory listids_sub = [x.asDict().values()[0] for x in d.select("sub_category").distinct().collect()] dfArraySub = [d.where(d.sub_category == x) for x in listids_sub] num = 1 for b in dfArraySub: #renames all columns to append a number for c in b.columns: if c not in ['category','sub_category','date']: column_name = str(c)+'_'+str(num) b = b.withColumnRenamed(str(c), str(c)+'_'+str(num)) b = b.drop('sub_category') num += 1 #if no df exists, create one and continually join new columns try: all_subs = all_subs.drop('sub_category').join(b.drop('sub_category'), on=['cateogry','date'], how='left') except: all_subs = b #Fixes missing columns on union try: try: diff_columns = list(set(all_cats.columns) - set(all_subs.columns)) for d in diff_columns: all_subs = all_subs.withColumn(d, lit(None)) all_cats = all_cats.union(all_subs) except: diff_columns = list(set(all_subs.columns) - set(all_cats.columns)) for d in diff_columns: all_cats = all_cats.withColumn(d, lit(None)) all_cats = all_cats.union(all_subs) except Exception as e: print e all_cats = all_subs 

But this is very slow. Any guidance would be greatly appreciated!

3
  • What is the logic behind ? how bed metrics are supposed to be in sales_1 and not in sales_2 ? Commented Feb 22, 2018 at 16:46
  • Can you explain the logic behind your code ? Commented Feb 22, 2018 at 18:45
  • I group by category and then collect into an arrow - essentially splitting the big df into a bunch of little df's with just a single category. Then I do the same thing for these smaller df's - but by sub category. It is very expensive with the .collect() - this is much more pandas-esque code. I am happy to scrap the entire thing and go with a much more spark oriented approach, like you posted below. Commented Feb 22, 2018 at 19:00

1 Answer 1

1

Your output is not really logical, but we can achieve this result using the pivot function. You need to precise your rules otherwise I can see a lot of cases it may fails.

from pyspark.sql import functions as F from pyspark.sql.window import Window df.show() +----------+---------+------------+------------+------------+ | date| category|sub_category|metric_sales|metric_trans| +----------+---------+------------+------------+------------+ |2018-01-01|furniture| bed| 100| 75| |2018-01-01|furniture| chair| 110| 85| |2018-01-01|furniture| shelf| 35| 30| |2018-02-01|furniture| bed| 55| 50| |2018-02-01|furniture| chair| 45| 40| |2018-02-01|furniture| shelf| 10| 15| |2018-01-01| rug| circle| 2| 5| |2018-01-01| rug| square| 3| 6| |2018-02-01| rug| circle| 3| 3| |2018-02-01| rug| square| 4| 5| +----------+---------+------------+------------+------------+ df.withColumn("fg", F.row_number().over(Window().partitionBy('date', 'category').orderBy("sub_category"))).groupBy('date', 'category', ).pivot('fg').sum('metric_sales', 'metric_trans').show() +----------+---------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+ | date| category|1_sum(CAST(`metric_sales` AS BIGINT))|1_sum(CAST(`metric_trans` AS BIGINT))|2_sum(CAST(`metric_sales` AS BIGINT))|2_sum(CAST(`metric_trans` AS BIGINT))|3_sum(CAST(`metric_sales` AS BIGINT))|3_sum(CAST(`metric_trans` AS BIGINT))| +----------+---------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+ |2018-02-01| rug| 3| 3| 4| 5| null| null| |2018-02-01|furniture| 55| 50| 45| 40| 10| 15| |2018-01-01|furniture| 100| 75| 110| 85| 35| 30| |2018-01-01| rug| 2| 5| 3| 6| null| null| +----------+---------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+ 
Sign up to request clarification or add additional context in comments.

6 Comments

Thank you so much for the prompt response. I agree - there are some failure scenarios. To further complicate things, some categories have varying numbers of subcategories. Am going to try this and will report back.
So this fails when I have 2 categories. So I could have rugs category with square and circle sub cats. Thoughts? I know it seems like an unorthodox transformation, but it is what I need.
I updated my original dataset to be more representative of the data I am working with, thanks.
Try changing partitionBy('sub_category') by partitionBy('date', 'category', 'sub_category')
So the issue is that the pivoting changes the columns to: bed_sum(CAST(metric_sales AS BIGINT)) vs. metric_sales_1. So the table grows really wide (a column for each pivoted sub_category), when I want them to look like my excel output up top - stacked by metric 1, metric 2, etc.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.