2

I have a Dataframe with text messages and a timestamp value for each row. Like so:

+--------------------------+---------------------+ | message | timestamp | +--------------------------+---------------------+ | some text from message 1 | 2019-08-03 01:00:00 | +--------------------------+---------------------+ | some text from message 2 | 2019-08-03 01:01:00 | +--------------------------+---------------------+ | some text from message 3 | 2019-08-03 01:03:00 | +--------------------------+---------------------+ 

I need to concatenate the messages by creating time windows of X number of minutes so that for example they look like this:

+---------------------------------------------------+ | message | +---------------------------------------------------+ | some text from message 1 some text from message 2 | +---------------------------------------------------+ | some text from message 3 | +---------------------------------------------------+ 

After doing the concatenation I have no use for the timestamp column so I can drop it or keep it with any value.

I have been able to do this by iterating through the entire Dataframe, adding timestamp diffs and inserting into a new Dataframe when the time window is achieved. It works but it's ugly and I am looking for some pointers into how to accomplish this in Scala in a more functional/elegant way.

I looked at the Window functions but since I am not doing aggregations it appears that I do not have a way to access the content of the groups once the WindowSpec is created so I didn't get very far.

I also looked at the lead and lag functions but I couldn't figure out how to use them without also having to go into a for loop.

I appreciate any ideas or pointers you can provide.

Any thoughts or pointers into how to accomplish this?

1 Answer 1

1

You can use the window datetime function (not to be confused with Window functions) to generate time windows, followed by a groupBy to aggregate messages using concat_ws:

import org.apache.spark.sql.functions._ import spark.implicits._ val df = Seq( ("message1", "2019-08-03 01:00:00"), ("message2", "2019-08-03 01:01:00"), ("message3", "2019-08-03 01:03:00") ).toDF("message", "timestamp") val duration = "2 minutes" df. groupBy(window($"timestamp", duration)). agg(concat_ws(" ", collect_list($"message")).as("message")). show(false) // +------------------------------------------+-----------------+ // |window |message | // +------------------------------------------+-----------------+ // |[2019-08-03 01:00:00, 2019-08-03 01:02:00]|message1 message2| // |[2019-08-03 01:02:00, 2019-08-03 01:04:00]|message3 | // +------------------------------------------+-----------------+ 
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks! this seems to be what I was looking for

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.