How can I use collect_set or collect_list on a dataframe after groupby. for example: df.groupby('key').collect_set('values'). I get an error: AttributeError: 'GroupedData' object has no attribute 'collect_set'
- 2Can you post some sample data that will throw this error so that we can debug your issue?Katya Willard– Katya Willard2016-06-02 13:24:26 +00:00Commented Jun 2, 2016 at 13:24
- in pyspark it works fine, btw I am trying precisely to translate this work in scala spark johnpaton.net/posts/forward-fill-spark (I mean the scoop of the job is backfilling and forward filling and this is how it works in pyspark)Olfa2– Olfa22022-10-12 07:10:54 +00:00Commented Oct 12, 2022 at 7:10
2 Answers
You need to use agg. Example:
from pyspark import SparkContext from pyspark.sql import HiveContext from pyspark.sql import functions as F sc = SparkContext("local") sqlContext = HiveContext(sc) df = sqlContext.createDataFrame([ ("a", None, None), ("a", "code1", None), ("a", "code2", "name2"), ], ["id", "code", "name"]) df.show() +---+-----+-----+ | id| code| name| +---+-----+-----+ | a| null| null| | a|code1| null| | a|code2|name2| +---+-----+-----+ Note in the above you have to create a HiveContext. See https://stackoverflow.com/a/35529093/690430 for dealing with different Spark versions.
(df .groupby("id") .agg(F.collect_set("code"), F.collect_list("name")) .show()) +---+-----------------+------------------+ | id|collect_set(code)|collect_list(name)| +---+-----------------+------------------+ | a| [code1, code2]| [name2]| +---+-----------------+------------------+ 4 Comments
If your dataframe is large, you can try using pandas udf(GROUPED_AGG) to avoid memory error. It is also much faster.
Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. Grouped aggregate Pandas UDFs are used with groupBy().agg() and pyspark.sql.Window. It defines an aggregation from one or more pandas.Series to a scalar value, where each pandas.Series represents a column within the group or window. pandas udf
example:
import pyspark.sql.functions as F @F.pandas_udf('string', F.PandasUDFType.GROUPED_AGG) def collect_list(name): return ', '.join(name) grouped_df = df.groupby('id').agg(collect_list(df["name"]).alias('names'))