1

I have the raw data DataFrame like that:

+-----------+--------------------+------+ |device | timestamp | value| +-----------+--------------------+------+ | device_A|2022-01-01 18:00:01 | 100| | device_A|2022-01-01 18:00:02 | 99| | device_A|2022-01-01 18:00:03 | 100| | device_A|2022-01-01 18:00:04 | 102| | device_A|2022-01-01 18:00:05 | 100| | device_A|2022-01-01 18:00:06 | 99| | device_A|2022-01-01 18:00:11 | 98| | device_A|2022-01-01 18:00:12 | 100| | device_A|2022-01-01 18:00:13 | 100| | device_A|2022-01-01 18:00:15 | 101| | device_A|2022-01-01 18:00:17 | 101| 

I'd like to aggregate them and to build the listed 10 s aggregation like that:

+-----------+--------------------+------------+-------+ |device | windowtime | values| counts| +-----------+--------------------+------------+-------+ | device_A|2022-01-01 18:00:00 |[99,100,102]|[1,3,1]| | device_A|2022-01-01 18:00:10 |[98,100,101]|[1,2,2]| 

To plot a heat-map graph of the values later.

I have succeed with getting the values column but not clear how to calculate the corresponding counts

.withColumn("values",collect_list(col("value")).over(Window.partitionBy($"device").orderBy($"timestamp".desc))) 

How can I do the weighted list aggregation in Apache Spark?

1 Answer 1

1

Group by time window using window function with duration of 10 seconds to get counts by value and device, then group by device + window_time and collect list of structs:

val result = ( df.groupBy( $"device", window($"timestamp", "10 second")("start").as("window_time"), $"value" ) .count() .groupBy("device", "window_time") .agg(collect_list(struct($"value", $"count")).as("values")) .withColumn("count", col("values.count")) .withColumn("values", col("values.value")) ) result.show() //+--------+-------------------+--------------+---------+ //| device| window_time| values| count| //+--------+-------------------+--------------+---------+ //|device_A|2022-01-01 18:00:00|[102, 99, 100]|[1, 2, 3]| //|device_A|2022-01-01 18:00:10|[100, 101, 98]|[2, 2, 1]| //+--------+-------------------+--------------+---------+ 
Sign up to request clarification or add additional context in comments.

1 Comment

Great answer, I had no idea that collect_list90 is applicable to a struct()

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.