36

I have this data frame

df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"]) +-----+---------+ |store| values| +-----+---------+ | 1|[1, 2, 3]| | 1|[4, 5, 6]| | 2| [2]| | 2| [3]| +-----+---------+ 

and I would like to convert into the follwing df:

+-----+------------------+ |store| values | +-----+------------------+ | 1|[1, 2, 3, 4, 5, 6]| | 2| [2, 3]| +-----+------------------+ 

I did this:

from pyspark.sql import functions as F df.groupBy("store").agg(F.collect_list("values")) 

but the solution has this WrappedArrays

+-----+----------------------------------------------+ |store|collect_list(values) | +-----+----------------------------------------------+ |1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]| |2 |[WrappedArray(2), WrappedArray(3)] | +-----+----------------------------------------------+ 

Is there any way to transform the WrappedArrays into concatenated arrays? Or can I do it differently?

2

6 Answers 6

29

You need a flattening UDF; starting from your own df:

spark.version # u'2.2.0' from pyspark.sql import functions as F import pyspark.sql.types as T def fudf(val): return reduce (lambda x, y:x+y, val) flattenUdf = F.udf(fudf, T.ArrayType(T.IntegerType())) df2 = df.groupBy("store").agg(F.collect_list("values")) df2.show(truncate=False) # +-----+----------------------------------------------+ # |store| collect_list(values) | # +-----+----------------------------------------------+ # |1 |[WrappedArray(1, 2, 3), WrappedArray(4, 5, 6)]| # |2 |[WrappedArray(2), WrappedArray(3)] | # +-----+----------------------------------------------+ df3 = df2.select("store", flattenUdf("collect_list(values)").alias("values")) df3.show(truncate=False) # +-----+------------------+ # |store| values | # +-----+------------------+ # |1 |[1, 2, 3, 4, 5, 6]| # |2 |[2, 3] | # +-----+------------------+ 

UPDATE (after comment):

The above snippet will work only with Python 2. With Python 3, you should modify the UDF as follows:

import functools def fudf(val): return functools.reduce(lambda x, y:x+y, val) 

Tested with Spark 2.4.4.

Sign up to request clarification or add additional context in comments.

2 Comments

This example on pyspark/spark 2.4 fails with the error NameError: name 'reduce' is not defined. Do you know why?
@AlexOrtner it's a Python 3 issue, and not a Spark one; pls see update
24

Now, it is possible to use the flatten function and things become a lot easier. You just have to flatten the collected array after the groupby.

# 1. Create the DF df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store","values"]) +-----+---------+ |store| values| +-----+---------+ | 1|[1, 2, 3]| | 1|[4, 5, 6]| | 2| [2]| | 2| [3]| +-----+---------+ # 2. Group by store df = df.groupBy("store").agg(F.collect_list("values")) +-----+--------------------+ |store|collect_list(values)| +-----+--------------------+ | 1|[[1, 2, 3], [4, 5...| | 2| [[2], [3]]| +-----+--------------------+ # 3. finally.... flat the array df = df.withColumn("flatten_array", F.flatten("collect_list(values)")) +-----+--------------------+------------------+ |store|collect_list(values)| flatten_array| +-----+--------------------+------------------+ | 1|[[1, 2, 3], [4, 5...|[1, 2, 3, 4, 5, 6]| | 2| [[2], [3]]| [2, 3]| +-----+--------------------+------------------+ 

1 Comment

Nice. Just to add. All these can be done in a single step via df.groupBy("store").agg(F.flatten(F.collect_list("values")))
24

For a simple problem like this, you could also use the explode function. I don't know the performance characteristics versus the selected udf answer though.

from pyspark.sql import functions as F df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(['store', 'values']) df2 = df.withColumn('values', F.explode('values')) # +-----+------+ # |store|values| # +-----+------+ # | 1| 1| # | 1| 2| # | 1| 3| # | 1| 4| # | 1| 5| # | 1| 6| # | 2| 2| # | 2| 3| # +-----+------+ df3 = df2.groupBy('store').agg(F.collect_list('values').alias('values')) # +-----+------------------+ # |store| values | # +-----+------------------+ # |1 |[4, 5, 6, 1, 2, 3]| # |2 |[2, 3] | # +-----+------------------+ 

Note: you could use F.collect_set() in the aggregation or .drop_duplicates() on df2 to remove duplicate values.

If you want to maintain ordered values in the collected list, I found the following method in another SO answer:

from pyspark.sql.window import Window w = Window.partitionBy('store').orderBy('values') df3 = df2.withColumn('ordered_value_lists', F.collect_list('values').over(w)) # +-----+------+-------------------+ # |store|values|ordered_value_lists| # +-----+------+-------------------+ # |1 |1 |[1] | # |1 |2 |[1, 2] | # |1 |3 |[1, 2, 3] | # |1 |4 |[1, 2, 3, 4] | # |1 |5 |[1, 2, 3, 4, 5] | # |1 |6 |[1, 2, 3, 4, 5, 6] | # |2 |2 |[2] | # |2 |3 |[2, 3] | # +-----+------+-------------------+ df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values')) df4.show(truncate=False) # +-----+------------------+ # |store|values | # +-----+------------------+ # |1 |[1, 2, 3, 4, 5, 6]| # |2 |[2, 3] | # +-----+------------------+ 

If the values themselves don't determine the order, you can use F.posexplode() and use the 'pos' column in your window functions instead of 'values' to determine order. Note: you will also need a higher level order column to order the original arrays, then use the position in the array to order the elements of the array.

df = sc.parallelize([(1, [1, 2, 3], 1), (1, [4, 5, 6], 2) , (2, [2], 1),(2, [3], 2)]).toDF(['store', 'values', 'array_order']) # +-----+---------+-----------+ # |store|values |array_order| # +-----+---------+-----------+ # |1 |[1, 2, 3]|1 | # |1 |[4, 5, 6]|2 | # |2 |[2] |1 | # |2 |[3] |2 | # +-----+---------+-----------+ df2 = df.select('*', F.posexplode('values')) # +-----+---------+-----------+---+---+ # |store|values |array_order|pos|col| # +-----+---------+-----------+---+---+ # |1 |[1, 2, 3]|1 |0 |1 | # |1 |[1, 2, 3]|1 |1 |2 | # |1 |[1, 2, 3]|1 |2 |3 | # |1 |[4, 5, 6]|2 |0 |4 | # |1 |[4, 5, 6]|2 |1 |5 | # |1 |[4, 5, 6]|2 |2 |6 | # |2 |[2] |1 |0 |2 | # |2 |[3] |2 |0 |3 | # +-----+---------+-----------+---+---+ w = Window.partitionBy('store').orderBy('array_order', 'pos') df3 = df2.withColumn('ordered_value_lists', F.collect_list('col').over(w)) # +-----+---------+-----------+---+---+-------------------+ # |store|values |array_order|pos|col|ordered_value_lists| # +-----+---------+-----------+---+---+-------------------+ # |1 |[1, 2, 3]|1 |0 |1 |[1] | # |1 |[1, 2, 3]|1 |1 |2 |[1, 2] | # |1 |[1, 2, 3]|1 |2 |3 |[1, 2, 3] | # |1 |[4, 5, 6]|2 |0 |4 |[1, 2, 3, 4] | # |1 |[4, 5, 6]|2 |1 |5 |[1, 2, 3, 4, 5] | # |1 |[4, 5, 6]|2 |2 |6 |[1, 2, 3, 4, 5, 6] | # |2 |[2] |1 |0 |2 |[2] | # |2 |[3] |2 |0 |3 |[2, 3] | # +-----+---------+-----------+---+---+-------------------+ df4 = df3.groupBy('store').agg(F.max('ordered_value_lists').alias('values')) # +-----+------------------+ # |store|values | # +-----+------------------+ # |1 |[1, 2, 3, 4, 5, 6]| # |2 |[2, 3] | # +-----+------------------+ 

Edit: If you'd like to keep some columns along for the ride and they don't need to be aggregated, you can include them in the groupBy or rejoin them after aggregation (examples below). If they do require aggregation, only group by 'store' and just add whatever aggregation function you need on the 'other' column/s to the .agg() call.

from pyspark.sql import functions as F df = sc.parallelize([(1, [1, 2, 3], 'a'), (1, [4, 5, 6], 'a') , (2, [2], 'b'), (2, [3], 'b')]).toDF(['store', 'values', 'other']) # +-----+---------+-----+ # |store| values|other| # +-----+---------+-----+ # | 1|[1, 2, 3]| a| # | 1|[4, 5, 6]| a| # | 2| [2]| b| # | 2| [3]| b| # +-----+---------+-----+ df2 = df.withColumn('values', F.explode('values')) # +-----+------+-----+ # |store|values|other| # +-----+------+-----+ # | 1| 1| a| # | 1| 2| a| # | 1| 3| a| # | 1| 4| a| # | 1| 5| a| # | 1| 6| a| # | 2| 2| b| # | 2| 3| b| # +-----+------+-----+ df3 = df2.groupBy('store', 'other').agg(F.collect_list('values').alias('values')) # +-----+-----+------------------+ # |store|other| values| # +-----+-----+------------------+ # | 1| a|[1, 2, 3, 4, 5, 6]| # | 2| b| [2, 3]| # +-----+-----+------------------+ df4 = ( df.drop('values') .join( df2.groupBy('store') .agg(F.collect_list('values').alias('values')), on=['store'], how='inner' ) .drop_duplicates() ) # +-----+-----+------------------+ # |store|other| values| # +-----+-----+------------------+ # | 1| a|[1, 2, 3, 4, 5, 6]| # | 2| b| [2, 3]| # +-----+-----+------------------+ 

2 Comments

How can you keep other columns with this approach?
You can add them into the groupBy function along with store, or you could join the final result with the initial input dataframe on the store column.
7

I would probably do it this way.

>>> df = sc.parallelize([(1, [1, 2, 3]), (1, [4, 5, 6]) , (2,[2]),(2,[3])]).toDF(["store", "values"]) >>> df.show() +-----+---------+ |store| values| +-----+---------+ | 1|[1, 2, 3]| | 1|[4, 5, 6]| | 2| [2]| | 2| [3]| +-----+---------+ >>> df.rdd.map(lambda r: (r.store, r.values)).reduceByKey(lambda x,y: x + y).toDF(['store','values']).show() +-----+------------------+ |store| values| +-----+------------------+ | 1|[1, 2, 3, 4, 5, 6]| | 2| [2, 3]| +-----+------------------+ 

1 Comment

How we'll remove duplicates
3

Since PySpark 2.4, you can use the following code:

df = df.groupBy("store").agg(collect_list("values").alias("values")) df = df.select("store", array_sort(array_distinct(expr("reduce(values, array(), (x,y) -> concat(x, y))"))).alias("values")) 

Comments

2

There is a predefined pyspark function to flatten

df = df.groupBy("store").agg(f.flatten(f.collect_list("values")).alias("values")) 

its documentation is here.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.