I'm having a hard time framing the following Pyspark dataframe manipulation.
Essentially I am trying to group by category and then pivot/unmelt the subcategories and add new columns.
I've tried a number of ways, but they are very slow and and are not leveraging Spark's parallelism.
Here is my existing (slow, verbose) code:
from pyspark.sql.functions import lit df = sqlContext.table('Table') #loop over category listids = [x.asDict().values()[0] for x in df.select("category").distinct().collect()] dfArray = [df.where(df.category == x) for x in listids] for d in dfArray: #loop over subcategory listids_sub = [x.asDict().values()[0] for x in d.select("sub_category").distinct().collect()] dfArraySub = [d.where(d.sub_category == x) for x in listids_sub] num = 1 for b in dfArraySub: #renames all columns to append a number for c in b.columns: if c not in ['category','sub_category','date']: column_name = str(c)+'_'+str(num) b = b.withColumnRenamed(str(c), str(c)+'_'+str(num)) b = b.drop('sub_category') num += 1 #if no df exists, create one and continually join new columns try: all_subs = all_subs.drop('sub_category').join(b.drop('sub_category'), on=['cateogry','date'], how='left') except: all_subs = b #Fixes missing columns on union try: try: diff_columns = list(set(all_cats.columns) - set(all_subs.columns)) for d in diff_columns: all_subs = all_subs.withColumn(d, lit(None)) all_cats = all_cats.union(all_subs) except: diff_columns = list(set(all_subs.columns) - set(all_cats.columns)) for d in diff_columns: all_cats = all_cats.withColumn(d, lit(None)) all_cats = all_cats.union(all_subs) except Exception as e: print e all_cats = all_subs But this is very slow. Any guidance would be greatly appreciated!
