6

Merge rows in a spark Dataframe

I have data like following

ID Name Passport Country License UpdatedtimeStamp 1 Ostrich 12345 - ABC 11-02-2018 1 - - - BCD 10-02-2018 1 Shah 12345 - - 12-02-2018 2 PJ - ANB a 10-02-2018 

Output required is

ID Name Passport Country License UpdatedtimeStamp 1 Shah 12345 - ABC 12-02-2018 2 PJ - ANB a 10-02-2018 

Basically, Data in same ID should merge, and latest updated and not null record should be in the output, if all values are null, then null should be retained..

Please suggest... Also, suggest it without using SparkSQL Window functions as i need it to be very fast

0

2 Answers 2

2

If you want to stay completely in sparkSQL

val df= Seq((1,Some("ostrich"), Some(12345), None, Some("ABC")," 11-02-2018" ), (1,None, None, None, Some("BCD"), "10-02-2018"),(1,Some("Shah"), Some(12345), None,None, "12-02-2018"), (2,Some("PJ"), None, Some("ANB"), Some("a"), "10-02-2018")).toDF("ID","Name","Passport","Country","License","UpdatedtimeStamp") val df1= df.withColumn("date", to_date($"UpdatedtimeStamp","MM-dd-yyyy" )).drop($"UpdatedtimeStamp") val win = Window.partitionBy("ID").orderBy($"date".desc) val df2=df1.select($"*", row_number.over(win).as("r")).orderBy($"ID", $"r").drop("r") val exprs= df2.columns.drop(1).map(x=>collect_list(x).as(x+"_grp")) val df3=df2.groupBy("ID").agg(exprs.head,exprs.tail: _*) val exprs2= df3.columns.drop(1).map(x=> col(x)(0).as(x)) df3.select((Array(col(df2.columns(0)))++exprs2): _*).show +---+----+--------+-------+-------+----------+ | ID|Name|Passport|Country|License| date| +---+----+--------+-------+-------+----------+ | 1|Shah| 12345| null| ABC|2018-12-02| | 2| PJ| null| ANB| a|2018-10-02| +---+----+--------+-------+-------+----------+ 
Sign up to request clarification or add additional context in comments.

Comments

1

you can achieve your result by defining a udf function and passing the collected struct columns to the udf function for sorting and populating the nulls with not null values. (comments are provided in the code for explanation)

import org.apache.spark.sql.functions._ //udf function definition def sortAndAggUdf = udf((structs: Seq[Row])=>{ //sorting the collected list by timestamp in descending order val sortedStruct = structs.sortBy(str => str.getAs[Long]("UpdatedtimeStamp"))(Ordering[Long].reverse) //selecting the first struct and casting to out case class val first = out(sortedStruct(0).getAs[String]("Name"), sortedStruct(0).getAs[String]("Passport"), sortedStruct(0).getAs[String]("Country"), sortedStruct(0).getAs[String]("License"), sortedStruct(0).getAs[Long]("UpdatedtimeStamp")) //aggregation for checking nulls and populating first not null value sortedStruct .foldLeft(first)((x, y) => { out( if(x.Name == null || x.Name.isEmpty) y.getAs[String]("Name") else x.Name, if(x.Passport == null || x.Passport.isEmpty) y.getAs[String]("Passport") else x.Passport, if(x.Country == null || x.Country.isEmpty) y.getAs[String]("Country") else x.Country, if(x.License == null || x.License.isEmpty) y.getAs[String]("License") else x.License, x.UpdatedtimeStamp) }) }) //making the rest of the columns as one column and changing the UpdatedtimeStamp column to long for sorting in udf df.select(col("ID"), struct(col("Name"), col("Passport"), col("Country"), col("License"), unix_timestamp(col("UpdatedtimeStamp"), "MM-dd-yyyy").as("UpdatedtimeStamp")).as("struct")) //grouping and collecting the structs and passing to udf function for manipulation .groupBy("ID").agg(sortAndAggUdf(collect_list("struct")).as("struct")) //separating the aggregated columns to separate columns .select(col("ID"), col("struct.*")) //getting the date in correct format .withColumn("UpdatedtimeStamp", date_format(col("UpdatedtimeStamp").cast("timestamp"), "MM-dd-yyyy")) .show(false) 

which should give you

+---+----+--------+-------+-------+----------------+ |ID |Name|Passport|Country|License|UpdatedtimeStamp| +---+----+--------+-------+-------+----------------+ |1 |Shah|12345 |null |ABC |12-02-2018 | |2 |PJ |null |ANB |a |10-02-2018 | +---+----+--------+-------+-------+----------------+ 

and of course a case class is needed

case class out(Name: String, Passport: String, Country: String, License: String, UpdatedtimeStamp: Long) 

2 Comments

Hi, This Solution works.. However, I need help expanding it... I have more than 30 fields in the dataframe and in Spark 1.6 (Scala 2.10), it is not allowed to have a case class with more than 22 fields.. Also, There are more than one last updated columns : each corresponding to different fields... Can you suggest the best way to achieve this?
I know it is old but can helpful to others so providing suggestion. you can use normal class in place of case class.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.