You can create a column with the row numbers (i.e. 1, 2, 3 for each record). Use this column as a pivot column with 2 aggregations - one for trouble_code and one for trouble_status.
pivot_data_sdf = data_sdf. \ withColumn('rn', func.row_number().over(wd.partitionBy('job').orderBy(func.lit(1))) ). \ groupBy('job'). \ pivot('rn'). \ agg(func.first('trouble_code').alias('trouble_code'), func.first('trouble_status').alias('trouble_status') ) # +----+--------------+----------------+--------------+----------------+--------------+----------------+ # | job|1_trouble_code|1_trouble_status|2_trouble_code|2_trouble_status|3_trouble_code|3_trouble_status| # +----+--------------+----------------+--------------+----------------+--------------+----------------+ # |yyyy| aa| close| bb| open| cc| open| # |xxxx| aa| open| bb| open| cc| close| # +----+--------------+----------------+--------------+----------------+--------------+----------------+
Just rename the trouble_* columns.
# function takes column name and renames it with the number at the end def col_rename(sdfcolname): colsplit = sdfcolname.split('_') rearr_colsplit = colsplit[1:3] + [colsplit[0]] new_sdfcolname = '_'.join(rearr_colsplit) return new_sdfcolname pivot_data_sdf. \ select(*[func.col(k).alias(col_rename(k)) if 'trouble'in k else k for k in pivot_data_sdf.columns]). \ show() # +----+--------------+----------------+--------------+----------------+--------------+----------------+ # | job|trouble_code_1|trouble_status_1|trouble_code_2|trouble_status_2|trouble_code_3|trouble_status_3| # +----+--------------+----------------+--------------+----------------+--------------+----------------+ # |yyyy| aa| close| bb| open| cc| open| # |xxxx| aa| open| bb| open| cc| close| # +----+--------------+----------------+--------------+----------------+--------------+----------------+