2

I have spark dataframe mainDF and deltaDF both with a matching schema.

Content of the mainDF is as follows:

id | name | age 1 | abc | 23 2 | xyz | 34 3 | pqr | 45 

Content of deltaDF is as follows:

id | name | age 1 | lmn | 56 4 | efg | 37 

I want to merge deltaDF with mainDF based on value of id. So if my id already exists in mainDF then the record should be updated and if id doesn't exist then the new record should be added. So the resulting data frame should be like this:

id | name | age 1 | lmn | 56 2 | xyz | 34 3 | pqr | 45 4 | efg | 37 

This is my current code and it is working:

 val updatedDF = mainDF.as("main").join(deltaDF.as("delta"),$"main.id" === $"delta.id","inner").select($"main.id",$"main.name",$"main.age") mainDF= mainDF.except(updateDF).unionAll(deltaDF) 

However here I need to explicitly provide list columns again in the select function which feels like overhead to me. Is there any other better/cleaner approach to achieve the same?

4
  • 1
    have you tried join() ? Commented Oct 21, 2016 at 7:25
  • It's not really in line with your question, but you can also do a union with a timestamp (a dummy one if you have none) and drop lines with duplicate id and older timestamp. Commented Oct 21, 2016 at 7:48
  • why you didnt use union ? Commented Oct 21, 2016 at 8:55
  • I am using union. Forgot to add that code. Commented Oct 21, 2016 at 9:23

3 Answers 3

6

If you don't want to provide the list of columns explicitly, you can map over the original DF's columns, something like:

.select(mainDF.columns.map(c => $"main.$c" as c): _*) 

BTW you can do this without a union after the join: you can use outer join to get records that don't exist in both DFs, and then use coalesce to "choose" the non-null value prefering deltaDF's values. So the complete solution would be something like:

val updatedDF = mainDF.as("main") .join(deltaDF.as("delta"), $"main.id" === $"delta.id", "outer") .select(mainDF.columns.map(c => coalesce($"delta.$c", $"main.$c") as c): _*) updatedDF.show // +---+----+---+ // | id|name|age| // +---+----+---+ // | 1| lmn| 56| // | 3| pqr| 45| // | 4| efg| 37| // | 2| xyz| 34| // +---+----+---+ 
Sign up to request clarification or add additional context in comments.

Comments

2

You can achieve this by using dropDuplicates and specifying on wich column you don't want any duplicates.

Here's a working code :

 val a = (1,"lmn",56)::(2,"abc",23)::(3,"pqr",45)::Nil val b = (1,"opq",12)::(5,"dfg",78)::Nil val df1 = sc.parallelize(a).toDF val df2 = sc.parallelize(b).toDF df1.unionAll(df2).dropDuplicates("_1"::Nil).show() +---+---+---+ | _1| _2| _3| +---+---+---+ | 1|lmn| 56| | 2|abc| 23| | 3|pqr| 45| | 5|dfg| 78| +---+---+---+ 

1 Comment

It doesn't work because it's randomly deleting one duplicate record, in merge scenario the latest record has to be picked ignoring the old one
0

Another way of doing so: pyspark implementation

updatedDF = mainDF.alias(“main”).join(deltaDF.alias(“delta”), main.id == delta.id,"left") upsertDF = updatedDF.where(“main.id IS not null").select("main.*") unchangedDF = updatedDF.where(“main.id IS NULL”).select("delta.*") finalDF = upsertDF.union(unchangedDF) 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.