1

I have a streaming Dataframe that I want to calculate min and avg over some columns.

Instead of getting separate resulting columns of min and avg after applying the operations, I want to merge the min and average output into a single column.

The dataframe look like this:

+-----+-----+ | 1 | 2 | +-----+-----+- |24 | 55 | +-----+-----+ |20 | 51 | +-----+-----+ 

I thought I'd use a Scala tuple for it, but that does not seem to work:

val res = List("1","2").map(name => (min(col(name)), avg(col(name))).as(s"result($name)")) 

All code used:

val res = List("1","2").map(name => (min(col(name)),avg(col(name))).as(s"result($name)")) val groupedByTimeWindowDF1 = processedDf.groupBy($"xyz", window($"timestamp", "60 seconds")) .agg(res.head, res.tail: _*) 

I'm expecting the output after applying the min and avg mathematical opearations to be:

+-----------+-----------+ | result(1)| result(2)| +-----------+-----------+ |20 ,22 | 51,53 | +-----------+-----------+ 

How I should write the expression?

0

2 Answers 2

2

Use struct standard function:

struct(colName: String, colNames: String*): Column

struct(cols: Column*): Column

Creates a new struct column that composes multiple input columns.

That gives you the values as well as the names (of the columns).

val res = List("1","2").map(name => struct(min(col(name)), avg(col(name))) as s"result($name)") ^^^^^^ HERE 

The power of struct can be seen when you want to reference one field in the struct and you can use the name (not index).

q.select("structCol.name") 
Sign up to request clarification or add additional context in comments.

Comments

1

What you want to do is to merge the values of multiple columns together in a single column. For this you can use the array function. In this case it would be:

val res = List("1","2").map(name => array(min(col(name)),avg(col(name))).as(s"result($name)")) 

Which will give you :

+------------+------------+ | result(1)| result(2)| +------------+------------+ |[20.0, 22.0]|[51.0, 53.0]| +------------+------------+ 

4 Comments

Thanks , In addition how to merge column name with DF row. I want result [20.0,22.0,result(1)] in this format. how should be expr ? val res = List("1","2").map(name => array(min(col(name)),avg(col(name))).as(s"result($name)"))
@shrikrishna: If I understand correctly you want to add the column name to the array? That seems very inconvenient to be honest, but you can add it with lit(s"result$name").
@ Shaido its work for me. I need column name for ForeachWriter for further operation and DB sink class. is any better design?
@shrikrishna: I suspect there is but it depends on your actual situation, I would recommend asking a new question about it with more details.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.