13

I have a dataframe:

DF:

1,2016-10-12 18:24:25 1,2016-11-18 14:47:05 2,2016-10-12 21:24:25 2,2016-10-12 20:24:25 2,2016-10-12 22:24:25 3,2016-10-12 17:24:25 

How to keep only latest record for each group? (there are 3 groups above (1,2,3)).

Result should be:

1,2016-11-18 14:47:05 2,2016-10-12 22:24:25 3,2016-10-12 17:24:25 

Trying also to make it efficient (e.g. to finish within few short minutes on a moderate cluster with 100 million records), so sorting/ordering should be done (if they are required) in most efficient and correct manner..

1

2 Answers 2

20

You have to use the window function.

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

you have to partition the window by the group and OrderBy time, below pyspark script do the work

from pyspark.sql.functions import * from pyspark.sql.window import Window schema = "Group int,time timestamp " df = spark.read.format('csv').schema(schema).options(header=False).load('/FileStore/tables/Group_window.txt') w = Window.partitionBy('Group').orderBy(desc('time')) df = df.withColumn('Rank',dense_rank().over(w)) df.filter(df.Rank == 1).drop(df.Rank).show() +-----+-------------------+ |Group| time| +-----+-------------------+ | 1|2016-11-18 14:47:05| | 3|2016-10-12 17:24:25| | 2|2016-10-12 22:24:25| +-----+-------------------+ ``` 
Sign up to request clarification or add additional context in comments.

1 Comment

you should use row_number() instead of dense_rank() because dense_rank() gives the same rank to 'tied' rows
0

You can use window functions as described here for cases like this:

scala> val in = Seq((1,"2016-10-12 18:24:25"), | (1,"2016-11-18 14:47:05"), | (2,"2016-10-12 21:24:25"), | (2,"2016-10-12 20:24:25"), | (2,"2016-10-12 22:24:25"), | (3,"2016-10-12 17:24:25")).toDF("id", "ts") in: org.apache.spark.sql.DataFrame = [id: int, ts: string] scala> import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.expressions.Window scala> val win = Window.partitionBy("id").orderBy("ts desc") win: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@59fa04f7 scala> in.withColumn("rank", row_number().over(win)).where("rank == 1").show(false) +---+-------------------+----+ | id| ts|rank| +---+-------------------+----+ | 1|2016-11-18 14:47:05| 1| | 3|2016-10-12 17:24:25| 1| | 2|2016-10-12 22:24:25| 1| +---+-------------------+----+ 

3 Comments

How do you write this in Pyspark?
The link I included features several examples in Pyspark
Thanks, I was able to transfer all but last line of code: in.withColumn("rank", row_number().over(win)).where('rank === 1).show(false)

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.