I have a DataFrame with a structure similar to:
root |-- NPAData: struct (nullable = true) | |-- NPADetails: struct (nullable = true) | | |-- location: string (nullable = true) | | |-- manager: string (nullable = true) | |-- service: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- serviceName: string (nullable = true) | | | |-- serviceCode: string (nullable = true) |-- NPAHeader: struct (nullable = true) | | |-- npaNumber: string (nullable = true) | | |-- date: string (nullable = true) What I am trying is to:
- Group the records which has got the same
npaNumberinto a list - Inside each list, order the elements depending on their
date - Once I have the elements grouped and ordered, I need merge the elements applying some logic. To perform this list step I decided to use a map.
Here is what I tried so far:
val toUpdate = sourceDF.withColumn("count", count($"NPAHeader").over(Window.partitionBy("NPAHeader.npaNumber").orderBy($"NPAHeader.date".desc))).filter($"count" > 1) val groupedNpa = toUpdate.groupBy($"NPAHeader.npaNumber" ).agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa")) //This is a simply version of my logic. def pickOne(List: Seq[Row]): Row = { println("First element: "+List.get(0)) List.get(0) } val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa")))) An example of a Row after the groupBy would be:
[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]
But I am getting an exception when I try to invoke the function from the map.
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row - field (class: "org.apache.spark.sql.Row", name: "_2") - root class: "scala.Tuple2"
What I understand is I can not invoke the function pickOne() from the map (Or at least not in the way I am trying it). But I don't know what am I doing wrong.
Why am I having that exception?
Thanks for your time!
Note: I know there are easier ways to pick up one element from the list without invoking the custom function. But I need to invoke it yes or yes, because in the next step I need to place there a far more complex logic to merge rows.
After using Mahesh Chand Kandpal suggestion:
import org.apache.spark.sql.catalyst.encoders.RowEncoder grouped.map(row => "emdNumber: "+row.getAs[String]("emdNumber")) val mergedNpa = groupedNpa.map(row => (row.getAs[String]("npaNumber"),pickOne(row.getAs[Seq[Row]]("npa"))(RowEncoder(row.schema)))) I get the following error:
type mismatch; found : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] required: Int
How should I apply the Encoder instead?