0

we need to merge multiple rows based on ID into a single record using Pyspark. If there are multiple updates to the column, then we have to select the one with the last update made to it. Please note, NULL would mean there was no update made to the column in that instance. So, basically we have to create a single row with the consolidated updates made to the records. So,for example, if this is the dataframe ...

Looking for similar answer, but in Pyspark .. Merge rows in a spark scala Dataframe

------------------------------------------------------------ | id | column1 | column2 | updated_at | ------------------------------------------------------------ | 123 | update1 | <*no-update*> | 1634228709 | | 123 | <*no-update*> | 80 | 1634228724 | | 123 | update2 | <*no-update*> | 1634229000 | 

expected output is -

------------------------------------------------------------ | id | column1 | column2 | updated_at | ------------------------------------------------------------ | 123 | update2 | 80 | 1634229000 | 
2
  • 1
    Why is column2 80 in the result. Isnt it supposed to be NULL, if you want the data from the latest update. Or are there more rules Commented Oct 14, 2021 at 20:12
  • @Emerson sorry for the confusion. NULL would actually mean there was no update done to the column in that row. Commented Oct 15, 2021 at 5:36

1 Answer 1

2

Let's say that our input dataframe is:

+---+-------+----+----------+ |id |col1 |col2|updated_at| +---+-------+----+----------+ |123|null |null|1634228709| |123|null |80 |1634228724| |123|update2|90 |1634229000| |12 |update1|null|1634221233| |12 |null |80 |1634228333| |12 |update2|null|1634221220| +---+-------+----+----------+ 

What we want is to covert updated_at to TimestampType then order by id and updated_at in desc order:

df = df.withColumn("updated_at", F.col("updated_at").cast(TimestampType())).orderBy( F.col("id"), F.col("updated_at").desc() ) 

that gives us:

+---+-------+----+-------------------+ |id |col1 |col2|updated_at | +---+-------+----+-------------------+ |12 |null |80 |2021-10-14 18:18:53| |12 |update1|null|2021-10-14 16:20:33| |12 |update2|null|2021-10-14 16:20:20| |123|update2|90 |2021-10-14 18:30:00| |123|null |80 |2021-10-14 18:25:24| |123|null |null|2021-10-14 18:25:09| +---+-------+----+-------------------+ 

Now get first non None value in each column or return None and group by id:

exp = [F.first(x, ignorenulls=True).alias(x) for x in df.columns[1:]] df = df.groupBy(F.col("id")).agg(*exp) 

And the result is:

+---+-------+----+-------------------+ |id |col1 |col2|updated_at | +---+-------+----+-------------------+ |123|update2|90 |2021-10-14 18:30:00| |12 |update1|80 |2021-10-14 18:18:53| +---+-------+----+-------------------+ 

Here's the full example code:

from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import TimestampType if __name__ == "__main__": spark = SparkSession.builder.master("local").appName("Test").getOrCreate() data = [ (123, None, None, 1634228709), (123, None, 80, 1634228724), (123, "update2", 90, 1634229000), (12, "update1", None, 1634221233), (12, None, 80, 1634228333), (12, "update2", None, 1634221220), ] columns = ["id", "col1", "col2", "updated_at"] df = spark.createDataFrame(data, columns) df = df.withColumn("updated_at", F.col("updated_at").cast(TimestampType())).orderBy( F.col("id"), F.col("updated_at").desc() ) exp = [F.first(x, ignorenulls=True).alias(x) for x in df.columns[1:]] df = df.groupBy(F.col("id")).agg(*exp) 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.