0

I have a spark df which I need to use to identify the last active record for each primary key based on a snapshot date. An example of what I have is:

A B C Snap
1 2 3 2019-12-29
1 2 4 2019-12-31

where the primary key is formed by fields A and B. I need to create a new field to indicate which register is active (the last snap for each set of rows with the same PK). So I need something like this:

A B C Snap activity
1 2 3 2019-12-29 false
1 2 4 2019-12-31 true

I have done this by creating an auxiliary df and then joining with the first one to bring back the active indicator but my original df is very big and I need something better in terms of performance. I have been thinking about window functions but I don´t know how I can implement it.

Once I have this I need to create a new field to indicate the end date of the record just filling the field in case that the activity field is equal to false just substracting 1 day to the snap date of the latest date for each set of rows with the same PK. I would need something like this:

A B C Snap activity end
1 2 3 2019-12-29 false 2019-12-30
1 2 4 2019-12-31 true

1 Answer 1

1

You can check row_number ordered by Snap in descending order. The 1st row is the last active snap:

df.selectExpr( '*', 'row_number() over (partition by A, B order by Snap desc) = 1 as activity' ).show() +---+---+---+----------+--------+ | A| B| C| Snap|activity| +---+---+---+----------+--------+ | 1| 2| 4|2019-12-31| true| | 1| 2| 3|2019-12-29| false| +---+---+---+----------+--------+ 

Edit: to get the end date for each group, use max window function on Snap:

import pyspark.sql.functions as f df.withColumn( 'activity', f.expr('row_number() over (partition by A, B order by Snap desc) = 1') ).withColumn( "end", f.expr('case when activity then null else max(date_add(to_date(Snap), -1)) over (partition by A, B) end') ).show() +---+---+---+----------+--------+----------+ | A| B| C| Snap|activity| end| +---+---+---+----------+--------+----------+ | 1| 2| 4|2019-12-31| true| null| | 1| 2| 3|2019-12-29| false|2019-12-30| +---+---+---+----------+--------+----------+ 
Sign up to request clarification or add additional context in comments.

1 Comment

and for the second part? How can I pick the snap date for the above record and substract one day?? For substract one day I have tried something like this": df = df.withColumn('endDate',when(col("activity") == False, date_add(df['SNAP'], -1)).otherwise(lit(""))) but still don't know how to take the date

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.