First of all I would reccommend reading the spark sql programming guide. This contains some thing that I think will generally help you as you learn spark.
Lets run through the process of reading in a csv file using a case class to define the schema.
First add the varioud imports needed for this example:
import java.io.{File, PrintWriter} // for reading / writing the example data import org.apache.spark.sql.types.{StringType, StructField} // to define the schema import org.apache.spark.sql.catalyst.ScalaReflection // used to generate the schema from a case class import scala.reflect.runtime.universe.TypeTag // used to provide type information of the case class at runtime import org.apache.spark.sql.Dataset, SparkSession} import org.apache.spark.sql.Encoder // Used by spark to generate the schema
Define a case class, the different types available can be found here:
case class Example( stringField : String, intField : Int, doubleField : Double )
Add the method for extracting a schema (StructType) given a case class type as a parameter:
// T : TypeTag means that an implicit value of type TypeTag[T] must be available at the method call site. Scala will automatically generate this for you. See [here][3] for further details. def schemaOf[T: TypeTag]: StructType = { ScalaReflection .schemaFor[T] // this method requires a TypeTag for T .dataType .asInstanceOf[StructType] // cast it to a StructType, what spark requires as its Schema }
Defnie a method to read in a csv file from a path with the schema defined using a case class:
// The implicit Encoder is needed by the `.at` method in order to create the Dataset[T]. The TypeTag is required by the schemaOf[T] call. def readCSV[T : Encoder : TypeTag]( filePath: String )(implicit spark : SparkSession) : Dataset[T]= { spark.read .option("header", "true") .option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS") .schema(schemaOf[T]) .csv(filePath) // spark provides this more explicit call to read from a csv file by default it uses comma and the separator but this can be changes. .as[T] }
Create a sparkSession:
implicit val spark = SparkSession.builder().master("local").getOrCreate()
Write some sample data to a temp file:
val data = s"""|stringField,intField,doubleField |hello,1,1.0 |world,2,2.0 |""".stripMargin val file = File.createTempFile("test",".csv") val pw = new PrintWriter(file) pw.write(data) pw.close()
An example of calling this method:
import spark.implicits._ // so that an implicit Encoder gets pulled in for the case class val df = readCSV[Example](file.getPath) df.show()