1

I have a dataset with application logs that show when a certain app was launched or closed. Sometimes, the related events may be missing entirely from the logs. I want to match each app start with the related end event (if it exists).

Here's an illustrative dataset:

import pyspark.sql.functions as F from pyspark.sql import Window df = spark.createDataFrame([['Group1', 'Logon', 'Name1', '2021-02-05T19:03:00.000+0000'], ['Group1', 'Start', 'Name1', '2021-02-05T19:04:00.000+0000'], ['Group1', 'Start', 'Name1', '2021-02-05T19:05:00.000+0000'], ['Group1', 'End', 'Name1', '2021-02-05T19:06:00.000+0000'], ['Group1', 'End', 'Name3', '2021-02-05T19:06:01.000+0000'], ['Group1', 'End', 'Name1', '2021-02-05T19:07:00.000+0000'], ['Group2', 'Start', 'Name1', '2021-02-05T19:04:00.000+0000'], ['Group2', 'Start', 'Name1', '2021-02-05T19:05:00.000+0000'], ['Group2', 'Start', 'Name2', '2021-02-05T19:06:00.000+0000'], ['Group2', 'End', 'Name1', '2021-02-05T19:07:00.000+0000'], ['Group2', 'Close', 'Name1', '2021-02-05T19:07:00.000+0000'], ], ['group', 'type', 'name', 'time']) df = df.withColumn('time', F.col('time').cast('timestamp')) 

For each group separately, I want to put a common identifier to each 'Start' and 'End' event if they have the same 'name'. In other words, for each 'Start' event I want to find the first 'End' event that has not already been matched to another 'Start' event.

The expected result could be something like the following picture:

enter image description here

I don't mind if the identifier (i.e. 'my_group') is an ID, a timestamp or if it is monotonically increasing across groups. I just want to be able to match the relevant events within each group.

What I've tried

I thought about using window functions in order to identify the end time of 'Start' events and the start time of 'End' events. However, I cannot restrict to searching only for 'End' events (and 'Start' events respectively). Also, I cannot apply the logic described above of finding the first 'End' event that has not already been matched to another 'Start' event.

Here's my code:

app_session_window_down = Window.partitionBy('group', "name").orderBy(F.col("time").cast('long')).rangeBetween(1, Window.unboundedFollowing) #search in the future app_session_window_up = Window.partitionBy('group', "name").orderBy(F.col("time").cast('long')).rangeBetween(Window.unboundedPreceding, -1) #search in the past df = df.withColumn("app_time_end", F.when((F.col("type") == 'Start'), F.first(F.col('time'), ignorenulls=True).over(app_session_window_down)).otherwise(F.lit('None')))\ .withColumn("app_time_start", F.when((F.col("type") == 'End'), F.last(F.col('time'), ignorenulls=True).over(app_session_window_up)).otherwise(F.col('app_time_end'))) 

which gives:

enter image description here

This is nowhere close to what I want to achieve. Any hints?

1 Answer 1

3

Explanations are in the inline comments:

from pyspark.sql import functions as F, Window df2 = df.withColumn( 'my_group', # the column you wanted F.when( F.col('type').isin(['Start', 'End']), F.row_number().over(Window.partitionBy('group', 'name', 'type').orderBy('time')) ) ).withColumn( 'max_group', # helper column: get maximum row_number for each group ; will be used later F.least( F.max( F.when( F.col('type') == 'Start', F.col('my_group') ).otherwise(0) ).over(Window.partitionBy('group', 'name')), F.max( F.when( F.col('type') == 'End', F.col('my_group') ).otherwise(0) ).over(Window.partitionBy('group', 'name')) ) ).withColumn( 'my_group', # mask the rows which don't have corresponding 'start'/'end' F.when( F.col('my_group') <= F.col('max_group'), F.col('my_group') ) ).withColumn( 'my_group', # add the group name F.when(F.col('my_group').isNotNull(), F.concat_ws('_', 'group', 'name', 'my_group')) ).drop('max_group').orderBy('group', 'time') 
df2.show() +------+-----+-----+-------------------+--------------+ | group| type| name| time| my_group| +------+-----+-----+-------------------+--------------+ |Group1|Logon|Name1|2021-02-05 19:03:00| null| |Group1|Start|Name1|2021-02-05 19:04:00|Group1_Name1_1| |Group1|Start|Name1|2021-02-05 19:05:00|Group1_Name1_2| |Group1| End|Name1|2021-02-05 19:06:00|Group1_Name1_1| |Group1| End|Name3|2021-02-05 19:06:01| null| |Group1| End|Name1|2021-02-05 19:07:00|Group1_Name1_2| |Group2|Start|Name1|2021-02-05 19:04:00|Group2_Name1_1| |Group2|Start|Name1|2021-02-05 19:05:00| null| |Group2|Start|Name2|2021-02-05 19:06:00| null| |Group2| End|Name1|2021-02-05 19:07:00|Group2_Name1_1| |Group2|Close|Name1|2021-02-05 19:07:00| null| +------+-----+-----+-------------------+--------------+ 
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks @mck. Gets close but does not work as expected if I add a new row in group1. I have updated the input DF above.
How can I make sure that the matched 'End' event is always after the 'Start' event?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.