0

Spark Dataframe formate conversion from input dataframe 1

|------|--------|-----------|---------------|------------- | |city product | Jan(sale) | Feb(sale) | Mar(sale)| |---------------|------------|--------------|-------------| |c1 | p1 | 123 | 22 | 34 | |---------------|------------|--------------|-------------| |c2 | p2 | 234 | 432 | 43 | |---------------|------------|--------------|-------------| 

to the output dataframe2 as transpose of the entire row and column as shown below.

|city | product | metric_type metric_value| --------------------------------------------------- | | c1 | p1 | Jan | 123 | ---------------------------------------------------- | c1 | p1 | Feb | 22 | ----------------------------------------------------- | c1 | p1 | Mar | 34 | | -------------------------------------------------- 
7
  • 1
    What is the question? Commented Jul 19, 2017 at 12:51
  • Hi Assaf, convert first dataframe to second dataframe without using spark sql Commented Jul 20, 2017 at 4:41
  • what do you mean without using spark sql? Commented Jul 20, 2017 at 4:55
  • i mean we need solution without using spark sql ,only use dataset API method Commented Jul 20, 2017 at 5:26
  • use methods on this url spark.apache.org/docs/latest/api/scala/… Commented Jul 20, 2017 at 5:28

2 Answers 2

1

A Dataset only solution would look like this:

case class orig(city: String, product: String, Jan: Int, Feb: Int, Mar: Int) case class newOne(city: String, product: String, metric_type: String, metric_value: Int) val df = Seq(("c1", "p1", 123, 22, 34), ("c2", "p2", 234, 432, 43)).toDF("city", "product", "Jan", "Feb", "Mar") val newDf = df.as[orig].flatMap(v => Seq(newOne(v.city, v.product, "Jan", v.Jan), newOne(v.city, v.product, "Feb", v.Feb), newOne(v.city, v.product, "Mar", v.Mar))) newDf.show() >>+----+-------+-----------+-----------+ >>|city|product|metric_type|metric_value| >>+----+-------+-----------+-----------+ >>| c1| p1| Jan| 123| >>| c1| p1| Feb| 22| >>| c1| p1| Mar| 34| >>| c2| p2| Jan| 234| >>| c2| p2| Feb| 432| >>| c2| p2| Mar| 43| >>+----+-------+-----------+-----------+ 

Using dataframe API

While the OP asked specifically for dataset only without spark sql, for others who look at this question, I believe a dataframe solution should be used.

First it is important to understand that dataset API is part of the spark SQL API. Datasets and dataframes are interchangeable and actually dataframe is simply a DataSet[Row]. While dataset has both "typed" and "untyped" API, ignoring some of the API seems wrong to me.

Second, pure "typed" option has limitations. For example, if we had 100 months instead of 3 then doing it the way above would be impractical.

Lastly, Spark provides a lot of optimization on dataframes which are unavaiable when using typed API (as the typed API is opaque to Spark) and therefore in many cases would get worse performance.

I would suggest using the following dataframe solution:

 val df = Seq(("c1", "p1", 123, 22, 34), ("c2", "p2", 234, 432, 43)).toDF("city", "product", "Jan", "Feb", "Mar") val months = Seq("Jan", "Feb", "Mar") val arrayedDF = df.withColumn("combined", array(months.head, months.tail: _*))_*)).select("city", "product", "combined") val explodedDF = arrayedDF.selectExpr("city", "product", "posexplode(combined) as (pos, metricValue)") val u = udf((p: Int) => months(p)) val targetDF = explodedDF.withColumn("metric_type", u($"pos")).drop("pos") targetDF.show() >>+----+-------+-----------+-----------+ >>|city|product|metricValue|metric_type| >>+----+-------+-----------+-----------+ >>| c1| p1| 123| Jan| >>| c1| p1| 22| Feb| >>| c1| p1| 34| Mar| >>| c2| p2| 234| Jan| >>| c2| p2| 432| Feb| >>| c2| p2| 43| Mar| >>+----+-------+-----------+-----------+ 

While this is a little longer, it handles the more generic case.

Sign up to request clarification or add additional context in comments.

4 Comments

Hi Assaf, df.as[orig] not working ,compile time error ◾Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. ◾not enough arguments for method as: (implicit evidence$2: org.apache.spark.sql.Encoder[orig])org.apache.spark.sql.Dataset[orig]. Unspecified value parameter evidence$2.
Then the code I provided should work out of the box (I ran it on 2.1.1 but that shouldn't be an issue). Could it be that your original dataframe/dataset has an incompatible schema? If you created the dataset from something and the case class I provided is not compatible you might get an error.
Hi Assaf,second code provided by you is working,but i want to generalise it for all month of year val months = Seq("Jan", "Feb", "Mar","Apr","May","June","Jul","Aug","Sep","Oct","Nov","Dec") ,may be in my csv not all month present? ,please give me solution
If you have only columns for some of the month in the dataframe you can do one of two things: either remove missing columns from your months sequence (e.g. months.filter(m => df.columns.contains(m))) or (especially if you are combining multiple dataframes) you can add missing months like this: val newDF = df.withColumn("missing month name", lit(XXX)) where XXX is some value to represent missing data (e.g. null) and then filter it out.
0

You need to transform the data frame from wide to long format (or gather columns or unpivot the data frame), one option is use flatMap:

import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val df = Seq(("c1", "p1", 123, 22, 34), ("c2", "p2", 234, 432, 43)).toDF("city", "product", "Jan", "Feb", "Mar") df.show +----+-------+---+---+---+ |city|product|Jan|Feb|Mar| +----+-------+---+---+---+ | c1| p1|123| 22| 34| | c2| p2|234|432| 43| +----+-------+---+---+---+ // schema of the result data frame val schema = StructType(List(StructField("city", StringType, true), StructField("product", StringType,true), StructField("metric_type", StringType, true), StructField("metric_value", IntegerType, true))) val months = List("Jan", "Feb", "Mar") val index = List("city", "product") // use flatMap to convert each row into three rows val rdd = df.rdd.flatMap( row => { val index_values = index.map(i => row.getAs[String](i)) months.map(m => Row.fromSeq(index_values ++ List(m, row.getAs[Int](m)))) } ) spark.createDataFrame(rdd, schema).show +----+-------+-----------+------------+ |city|product|metric_type|metric_value| +----+-------+-----------+------------+ | c1| p1| Jan| 123| | c1| p1| Feb| 22| | c1| p1| Mar| 34| | c2| p2| Jan| 234| | c2| p2| Feb| 432| | c2| p2| Mar| 43| +----+-------+-----------+------------+ 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.