0

I have a DataFrame with 6 columns like this:

df.printSchema root |-- d1: string (nullable = true) |-- d2: string (nullable = true) |-- d3: string (nullable = true) |-- m1: string (nullable = true) |-- m2: string (nullable = true) |-- m3: string (nullable = true) 

For some reasons, I'd like to convert it to like this:

root |-- d1: string (nullable = true) |-- d2: string (nullable = true) |-- d3: string (nullable = true) |-- metric: nested |-- m1: string (nullable = true) |-- m2: string (nullable = true) |-- m3: string (nullable = true) 

I spent hours but I can't figure it out. What I did so far is below

case class Metric(m1: String, m2: String, m3: String) case class Dimension(d1: String, d2: String, d3: String, metric: Metric) scala> df.map(row => Dimension(row.getAs[String]("d1"), | row.getAs[String]("d2"), | row.getAs[String]("d3"), | Metric(row.getAs[String]("m1"), | row.getAs[String]("m2"), | row.getAs[String]("m3")))) res48: org.apache.spark.rdd.RDD[Dimension] = MapPartitionsRDD[32] at map at <console>:46 scala> df.map(row => Dimension(row.getAs[String]("d1"), | row.getAs[String]("d2"), | row.getAs[String]("d3"), | Metric(row.getAs[String]("m1"), | row.getAs[String]("m2"), | row.getAs[String]("m3")))).collect().foreach(println) WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 220, hostname): java.lang.ClassNotFoundException: $line55.$read$$iwC$$iwC$Dimension scala> df.map(row => Dimension(row.getAs[String]("d1"), | row.getAs[String]("d2"), | row.getAs[String]("d3"), | Metric(row.getAs[String]("m1"), | row.getAs[String]("m2"), | row.getAs[String]("m3")))).toDF res50: org.apache.spark.sql.DataFrame = [d1: string, d2: string, d3: string, metric: struct<m1:string,m2:string,m3:string>] scala> df.map(row => Dimension(row.getAs[String]("d1"), | row.getAs[String]("d2"), | row.getAs[String]("d3"), | Metric(row.getAs[String]("m1"), | row.getAs[String]("m2"), | row.getAs[String]("m3")))).toDF.select("d1").show() ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerSQLExecutionStart(1,show at <console>:51,org.apache.spark.sql.DataFrame.show(DataFrame.scala:319) 

Please, help me. Thanks.

1 Answer 1

2

Required imports:

// SQLContext in Spark 1.x val spark: SparkSession = ??? import org.apache.spark.sql.functions.struct import spark.implicits._ import sqlContext.implicits._ // Spark 1.x 

Simple select:

df.select($"d1", $"d2", $"d3", struct($"m1", $"m2", $"m3").alias("metrics")) 

followed by (Spark 2.x):

.as[Dimension] 

if you want a statically Dataset[Dimension] instead of DataFrame.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.