2

I would like to combine rows in the following spark dataframe:

+-------+------------+--------+ | date | description| amount | +-------+------------+--------+ | 01/10 | first | 10 | | null | second | null | | null | third | null | | 02/10 | first | 14 | | 03/10 | third | 12 | | null | third | null | | null | second | null | | 04/10 | first | 15 | +-------+------------+--------+ 

so that the description field is combined for rows which have a description spanning multiple rows. The result would look like:

+-------+-----------------------+--------+ | date | description | amount | +-------+-----------------------+--------+ | 01/10 | first, second, third | 10 | | 02/10 | first | 14 | | 03/10 | third, third, second | 12 | | 04/10 | first | 15 | +-------+-----------------------+--------+ 

The null rows don't have any identifier to link them to the correct date row other than they are always the sequential rows below and are null in all other columns.

Thanks!!

2 Answers 2

4

First of all replace your null values with the previous row's good value.

df = df.withColumn("good_date", func.last('date', True).over(Window.rowsBetween(-sys.maxsize, 0))) 

After this use collect_list function

grouped_df = df.groupby('good_date').agg(func.collect_list('description').alias("description_list")) 

Then use concat_ws to concatenate values of collected list in previous step:

final_df = grouped_df.withColumn("description", func.concat_ws(", ", "description_list")) 

Remember, Last function is non-deterministic because its results depends on the order of the rows which may be non-deterministic after a shuffle. Its better to use it with Window.partitionBy() where df is partitioned based on columns which won't affect the order of rows after shuffling on transformations like groupByKey, reduceByKey, join, union, etc. If your code does not have transformation functions where spark shuffles data then you are good to go.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for your answer Drashti, I cannot get the first line to replace all the null values with the previous valid values. My error suggests I need to "Wrap '(last(date, true) AS date)' in windowing function".
0

I think its possible only if you manage to get ID column so that we can use window function

>>> data = (['01/10', 'first', '10', 1], ['', 'second', '', 1], ['', 'third', '', 1], ['02/10', 'first', '14', 2], ['03/10', 'third', '12', 3], ['', 'third', '', 3], ['', 'second', '', 3], ['04/10', 'first', '15', 4]) >>> df = spark.createDataFrame(data, schema=['date', 'description', 'amount', 'id']) >>> df.show() +-----+-----------+------+---+ | date|description|amount| id| +-----+-----------+------+---+ |01/10| first| 10| 1| | | second| | 1| | | third| | 1| |02/10| first| 14| 2| |03/10| third| 12| 3| | | third| | 3| | | second| | 3| |04/10| first| 15| 4| +-----+-----------+------+---+ >>> date_df = df.groupBy('id').agg(func.collect_set('date').alias('date')) >>> amount_df = df.groupBy('id').agg(func.collect_set('amount').alias('amount')) >>> description_df = df.groupBy('id').agg(func.collect_set('description').alias('description')) >>> all_df = df.join(date_df, ['id']).join(amount_df, ['id']).join(description_df, ['id']) >>> all_df.distinct().show(10, False) +---+------+---------+----------------------+ |id |amount|date |description | +---+------+---------+----------------------+ |1 |[, 10]|[01/10, ]|[third, second, first]| |3 |[12, ]|[03/10, ]|[third, second] | |2 |[14] |[02/10] |[first] | |4 |[15] |[04/10] |[first] | +---+------+---------+----------------------+ 

1 Comment

Thanks for the response. Unfortunately, I have a large dataset which doesn't have any way of distinguishing the id. I would have to write an operation to create an id column that increments every time the date field is valid. Can you suggest how this would be done?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.