0

I have a dataframe (df) with below structure: Grid_ID, Latitude, Longitude, DateTimeStamp

Input:

Grid_ID Latitude Longitude DateTimeStamp Grid_1 Lat1 Long1 2021-06-30 00:00:00 Grid_1 Lat1 Long1 2021-06-30 00:01:00 Grid_1 Lat1 Long1 2021-06-30 00:02:00 Grid_1 Lat2 Long2 2021-07-01 00:00:00 Grid_1 Lat2 Long2 2021-07-01 00:01:00 Grid_1 Lat2 Long2 2021-07-01 00:02:00 

and Grid_ID has two sets of lat and long that are mutually exclusive based upon Date column. i.e. when Date <= 06/30/2021, Latitude/Longitude = Lat1/Long1 and when Date > 06/30/2021, Latitude/Longitude = Lat2/Long2, respectively

I need to create new columns (Corrected_Lat and Corrected_Long) and assign Lat2/Long2

I am using groupBy and agg as below to do the above:

df_dated = df.withColumn("date", F.to_date("DateTimeStamp")) \ .filter(F.col("date") == "2021-07-01") \ .groupBy("Grid_ID") \ .agg(F.collect_set("Latitude").getItem(0).cast("float").alias("corrected_lat"), F.collect_set("Longitude").getItem(0).cast("float").alias("corrected_long")) \ .withColumnRenamed("Grid_ID", "Grid_ID_dated") \ .select("Grid_ID_dated", "corrected_lat", "corrected_long") df_final = df.join(df_dated, on=[df.Grid_ID == df_dated.Grid_ID_dated], how="inner") \ .select(*df.columns, "corrected_lat", "corrected_long") 

Output:

Grid_ID Latitude Longitude DateTimeStamp corrected_lat corrected_long Grid_1 Lat1 Long1 2021-06-30 00:00:00 Lat2 Long2 Grid_1 Lat1 Long1 2021-06-30 00:01:00 Lat2 Long2 Grid_1 Lat1 Long1 2021-06-30 00:02:00 Lat2 Long2 Grid_1 Lat2 Long2 2021-07-01 00:00:00 Lat2 Long2 Grid_1 Lat2 Long2 2021-07-01 00:01:00 Lat2 Long2 Grid_1 Lat2 Long2 2021-07-01 00:02:00 Lat2 Long2 

But I am wondering if windows function can be used here and would it be faster than first approach using groupBy and agg?

Any other approach that is faster is certainly appreciated.

4
  • Can you add examples of input and output? Why are you filtering this specific date filter(F.col("date") == "2021-07-01")? Commented Aug 30, 2022 at 17:55
  • @bzu out of the two set of coordinates for each Grid_ID, I need to keep the 2nd set (Lat2/Long2). Second set of coordinates are available for timestamps on or after 2021-07-01. Before this date, the Gird_ID has former set of coordinates (Lat1/Long1) Commented Aug 30, 2022 at 18:09
  • So basically you want to keep 1 line per Grid_ID with date >= 2021-07-01? Are Lat2/Long2 in all rows the same for each Grid_ID? Commented Aug 30, 2022 at 18:19
  • yes, there are two sets of lat/long for each grid_id currently but final output should have the one set of lat long (latter set i.e. Lat2/Long2) for each grid id. Commented Aug 30, 2022 at 19:45

1 Answer 1

1

This will apply the latest Latitude/Longitude value to all rows (per Grid_ID):

w = ( Window.partitionBy('Grid_ID') .orderBy("DateTimeStamp") .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) ) df_final = ( df.withColumn("date", F.to_date("DateTimeStamp")) .select(*df.columns, F.last('Latitude').over(w).alias('corrected_lat'), F.last('Longitude').over(w).alias('corrected_long'), ) ) 
Sign up to request clarification or add additional context in comments.

4 Comments

shouldn't the orderBy("DateTimeStamp") be orderBy("date") ?
You could also use "date". In any case the window will be sorted and with last() you can get the most recent value. Does this work for you?
it certainly did. I wasnt aware of rowsBetween with options you pointed me to. Would you also happen to know the performance comparison between use of "windows" and "aggregation using groupby", i.e. whether one is faster in spark over other (1) universally (2) situation dependent
I don't know if there is a general answer to this. I would look at df_final.explain() and try to get any hints from there or do a couple of benchmarks on a subset of input data.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.