0

I would like to write generic method with three input parameters:

  1. filePath - String
  2. schema - ?
  3. case class

So, my idea is to write a method like this:

def load_sms_ds(filePath: String, schemaInfo: ?, cc: ?) = { val ds = spark.read .format("csv") .option("header", "true") .schema(?) .option("delimiter",",") .option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS") .load(schemaInfo) .as[?] ds } 

and to return dataset depending on a input parameters. I am not sure though what type should parameters schemaInfo and cc be?

1 Answer 1

5

First of all I would reccommend reading the spark sql programming guide. This contains some thing that I think will generally help you as you learn spark.

Lets run through the process of reading in a csv file using a case class to define the schema.

First add the varioud imports needed for this example:

import java.io.{File, PrintWriter} // for reading / writing the example data import org.apache.spark.sql.types.{StringType, StructField} // to define the schema import org.apache.spark.sql.catalyst.ScalaReflection // used to generate the schema from a case class import scala.reflect.runtime.universe.TypeTag // used to provide type information of the case class at runtime import org.apache.spark.sql.Dataset, SparkSession} import org.apache.spark.sql.Encoder // Used by spark to generate the schema 

Define a case class, the different types available can be found here:

case class Example( stringField : String, intField : Int, doubleField : Double ) 

Add the method for extracting a schema (StructType) given a case class type as a parameter:

// T : TypeTag means that an implicit value of type TypeTag[T] must be available at the method call site. Scala will automatically generate this for you. See [here][3] for further details. def schemaOf[T: TypeTag]: StructType = { ScalaReflection .schemaFor[T] // this method requires a TypeTag for T .dataType .asInstanceOf[StructType] // cast it to a StructType, what spark requires as its Schema } 

Defnie a method to read in a csv file from a path with the schema defined using a case class:

// The implicit Encoder is needed by the `.at` method in order to create the Dataset[T]. The TypeTag is required by the schemaOf[T] call. def readCSV[T : Encoder : TypeTag]( filePath: String )(implicit spark : SparkSession) : Dataset[T]= { spark.read .option("header", "true") .option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS") .schema(schemaOf[T]) .csv(filePath) // spark provides this more explicit call to read from a csv file by default it uses comma and the separator but this can be changes. .as[T] } 

Create a sparkSession:

implicit val spark = SparkSession.builder().master("local").getOrCreate() 

Write some sample data to a temp file:

val data = s"""|stringField,intField,doubleField |hello,1,1.0 |world,2,2.0 |""".stripMargin val file = File.createTempFile("test",".csv") val pw = new PrintWriter(file) pw.write(data) pw.close() 

An example of calling this method:

import spark.implicits._ // so that an implicit Encoder gets pulled in for the case class val df = readCSV[Example](file.getPath) df.show() 
Sign up to request clarification or add additional context in comments.

6 Comments

Hi Steve, thank you for you help! Could you please just explain me in more detail this part: ...[T : Encoder : TypeTag]... and also how would one use(envoke) method loadSmsDs?
That part means that an implicit encoder and type tag are needed in order to use that method. The encoder can be imported from sparkSession.implicits._. I will update the answer later this evening.
I'm trying to understand the code you've provided, but to be honest at the moment it's probably too advance for me :) Anyways, what I'm trying to do is invoke method loadSmsDs where I have previously defined filePath, StructType(using Encoders.product[some case class].schema) and third parameter that same case class used to create StructType
after sometime spent on trying to invoke method, I get the error: "notebook:1: error: ambiguous implicit values: both method newIntEncoder in class SQLImplicits of type => org.apache.spark.sql.Encoder[Int] and method newLongEncoder in class SQLImplicits of type => org.apache.spark.sql.Encoder[Long] match expected type org.apache.spark.sql.Encoder[T]" any thoughts on this?
I have made some edits to the above. I hope this helps solve your problem and please let me know if anything is unclear.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.