3

I'd like to create a Row with a schema from a case class to test one of my map functions. The most straightforward way I can think of doing this is:

import org.apache.spark.sql.Row case class MyCaseClass(foo: String, bar: Option[String]) def buildRowWithSchema(record: MyCaseClass): Row = { sparkSession.createDataFrame(Seq(record)).collect.head } 

However, this seemed like a lot of overhead to just get a single Row, so I looked into how I could directly create a Row with a schema. This led me to:

import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.{Encoders, Row} def buildRowWithSchemaV2(record: MyCaseClass): Row = { val recordValues: Array[Any] = record.getClass.getDeclaredFields.map((field) => { field.setAccessible(true) field.get(record) }) new GenericRowWithSchema(recordValues, Encoders.product[MyCaseClass].schema) } 

Unfortunately, the Row that the second version returns is different from the first Row. Option fields in the first version are reduced to their primitive values, while they are still Options in the second version. Also, the second version is quite unwieldy.

Is there a better way to do this?

1 Answer 1

2

The second version is returning Option itself for the bar case class field, thus you are not getting primitive value as the first version. you can use the following code for primitive values

def buildRowWithSchemaV2(record: MyCaseClass): Row = { val recordValues: Array[Any] = record.getClass.getDeclaredFields.map((field) => { field.setAccessible(true) val returnValue = field.get(record) if(returnValue.isInstanceOf[Option[String]]){ returnValue.asInstanceOf[Option[String]].get } else returnValue }) new GenericRowWithSchema(recordValues, Encoders.product[MyCaseClass].schema) } 

But meanwhile I would suggest you to use DataFrame or DataSet.

DataFrame and DataSet are themselves collections of Row with schema.
So when you have a case class defined, you just need to encode your input data into case class For example: lets say you have input data as

val data = Seq(("test1", "value1"),("test2", "value2"),("test3", "value3"),("test4", null)) 

If you have a text file you can read it with sparkContext.textFile and split according to your need.
Now when you have converted your data to RDD, converting it to dataframe or dataset is two lines code

import sqlContext.implicits._ val dataFrame = data.map(d => MyCaseClass(d._1, Option(d._2))).toDF 

.toDS would generate dataset Thus you have collection of Rows with schema
for validation you can do the followings

println(dataFrame.schema) //for checking if there is schema println(dataFrame.take(1).getClass.getName) //for checking if it is a collection of Rows 

Hope you have the right answer.

Sign up to request clarification or add additional context in comments.

5 Comments

take(1) is not much different albeit more efficient than collect.head. I believe the OP wants to entirely skip the creation of an RDD.
@HristoIliev, I did take(1) just to validate/show that dataframe or dataset is collection of Row with schema.
But the OP wants a single Row, not a DataSet or a DataFrame. Now, I'm not really sure why, because he can easily test a map function on a distributed collection with a single element.
@HristoIliev, yes I agree that OP wants a single Row to be generated for each operation. But that would create many tasks and much memory usage when working with big data. So as dataFrame or dataSet is in itself a collection of Row with schema, I would recommend these to be used. Don't you agree?
He is trying to test something, not create big data row by row.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.