1

I'm trying to create generic DataSet[T] reader in order to avoid dataframe.as[..] for each reader call. There's support for primitive types and case classes so I was thinking about something like:

def read[T <: Product](sql : String): Dataset[T] = { import sparkSession.implicits._ val sqlContext = sparkSession.sqlContext val df: DataFrame = sqlContext.read.option("query", sql).load() df.as[T] } 

But I'm getting 'Unable to find encoder for type stored in Dataset' error. Is it possible to do something like that ?

Second cycle:

def read[T <: Product](sql : String) : Dataset[T] = { import sparkSession.implicits._ innerRead(sql) } private def innerRead[T <: Product : Encoder](sql : String): Dataset[T] = { val sqlContext = sparkSession.sqlContext val df: DataFrame = sqlContext.read.option("query", sql).load() df.as[T] } 

ends with type mismatch (foudn Encoder[Nothing] , required Encoder[T]).

I was trying to import newProductEncoder only , but ended the same.

2 Answers 2

7

In order to convert a DataFrame to a Dataset you need to have an Encoder. You can do it by simply adding a context bound on and Encoder for T:

def read[T <: Product : Encoder](sql : String): Dataset[T] = { import sparkSession.implicits._ val sqlContext = sparkSession.sqlContext val df: DataFrame = sqlContext.read.option("query", sql).load() df.as[T] } 

A context bound is syntactic sugar for the following:

def read[T <: Product](sql : String)(implicit $ev: Encoder[T]): Dataset[T] 

which means that you need to have in the implicit context one (and only one) instance of an Encoder[T].

This is needed because the as method itself requires this context bound.

Spark itself can provide you most of the Encoders you may need (primitives, Strings and case classes so far) by importing (as you did) the implicits for your SparkSession. These, however, must be available in the implicit scope at call site, meaning that what you want to have is probably more like the following:

def read[T <: Product : Encoder](spark: SparkSession, sql: String): Dataset[T] = { import spark.implicits._ val df: DataFrame = spark.sqlContext.read.option("query", sql).load() df.as[T] } val spark: SparkSession = ??? // your SparkSession object import spark.implicits._ val ds: Dataset[YourType] = read[YourType](spark, "select something from a_table") 
Sign up to request clarification or add additional context in comments.

2 Comments

thanks, i extended the original question with new details.
read doesn't specify the context bound, which means that innerRead it cannot see it. The context bound must be propagated throughout your calls. However, in order for the implicits to be available at the right moment, you have to import them at call site rather that as you define you method.
1

In your second cycle, perhaps you need to give the type parameter to your innerRead call:

innerRead[T](sql) 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.