1

I have a dataframe which looks like one given below. All the values for a corresponding id is the same except for the mappingcol field.

+--------------------+----------------+--------------------+-------+ |misc |fruit |mappingcol |id | +--------------------+----------------+--------------------+-------+ |ddd |apple |Map("name"->"Sameer"| 1 | |ref |banana |Map("name"->"Riyazi"| 2 | |ref |banana |Map("lname"->"Nikki"| 2 | |ddd |apple |Map("lname"->"tenka"| 1 | +--------------------+----------------+--------------------+-------+ 

I want to merge the rows with same row in such a way that I get exactly one row for one id and the value of mappingcol needs to be merged. The output should look like :

+--------------------+----------------+--------------------+-------+ |misc |fruit |mappingcol |id | +--------------------+----------------+--------------------+-------+ |ddd |apple |Map("name"->"Sameer"| 1 | |ref |banana |Map("name"->"Riyazi"| 2 | +--------------------+----------------+--------------------+-------+ 

the value for mappingcol for id = 1 would be :

Map( "name" -> "Sameer", "lname" -> "tenka" ) 

I know that maps can be merged using ++ operator, so thats not what im worried about. I just cant understand how to merge the rows, because if I use a groupBy, I have nothing to aggregate the rows on.

2 Answers 2

5

You can use by groupBy and then managing a little the map

df.groupBy("id", "fruit", "misc").agg(collect_list("mappingcol")) .as[(Int, String, String, Seq[Map[String, String]])] .map { case (id, fruit, misc, list) => (id, fruit, misc, list.reduce(_ ++ _)) } .toDF("id", "fruit", "misc", "mappingColumn") 
  • With the first line, tou group by your desired columns and aggregate the map pairs in the same element (an array)
  • With the second line (as), you convert your structure to a Dataset of a Tuple4 with the last element being a sequence of maps
  • With the third line (map), you merge all the elements to a single map
  • With the last line (toDF) to give the columns the original names

OUTPUT

+---+------+----+--------------------------------+ |id |fruit |misc|mappingColumn | +---+------+----+--------------------------------+ |1 |apple |ddd |[name -> Sameer, lname -> tenka]| |2 |banana|ref |[name -> Riyazi, lname -> Nikki]| +---+------+----+--------------------------------+ 
Sign up to request clarification or add additional context in comments.

6 Comments

Thank you so much for your answer, but I have a lot of columns and i want to avoid adding them to the groupby. Is it possible using a self join by any chance?
I think the solution of @SCouto is a good approach, otherwise you might need more transformations to achieve the same result.
@CesarA.Mostacero is it possible to achieve this using window functions?
I think window function does not apply here. Basically the behavior that you want to achieve is: keep the same value for an specific key (composed key - 3 fields) and aggregate the values of one specific column, which is basically the solution by @SCouto.
Window function solution is possible, see latest answer. As a rule of thumb anything that a groupby can do, can also be done with a window function. The code is meatier but it gives you finer control over the process
|
2

You can definitely do the above with a Window function!

This is in PySpark not Scala but there's almost no difference when only using native Spark functions.

The below code only works on a map column that 1 one key, value pair per row, as it how your example data is, but it can be made to work with map columns with multiple entries.

from pyspark.sql import Window map_col = 'mappingColumn' group_cols = ['id', 'fruit', 'misc'] # or, a lazier way if you have a lot of columns to group on cols = df.columns # save as list group_cols_2 = cols.remove('mappingCol') # remove what you're not grouping by w = Window.partitionBy(group_cols) # unpack map value and key into a pair struct column df1 = df.withColumn(map_col , F.struct(F.map_keys(map_col)[0], F.map_values(map_col)[0])) # Collect all key values into an array of structs, here each row # contains the map entries for all rows in the group/window df1 = df1.withColumn(map_col , F.collect_list(map_col).over(w)) # drop duplicate values, as you only want one row per group df1 = df1.dropDuplicates(group_cols) # return the values for map type df1 = df1.withColumn(map_col , F.map_from_entries(map_col)) 

You can save the output of each step to a new column to see how each step works, as I have done below.

from pyspark.sql import Window map_col = 'mappingColumn' group_cols = list('id', 'fruit', 'misc') w = Window.partitionBy(group_cols) df1 = df.withColumn('test', F.struct(F.map_keys(map_col)[0], F.map_values(map_col)[0])) df1 = df1.withColumn('test1', F.collect_list('test').over(w)) df1 = df1.withColumn('test2', F.map_from_entries('test1')) df1.show(truncate=False) df1.printSchema() df1 = df1.dropDuplicates(group_cols) 

1 Comment

in general window functions are costlier than group by

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.