3

I've been struggling with org.apache.spark.SparkException: Task not serializable but finally figured out how to make this work:

case class Article(id: Int, title: String, content: String) extends Serializable val index: RDD[(String, List[(Int, Int)])] = (for { article <- articlesRDD text = article.title + article.content word <- text.split(" ") } yield (word, (article.id, 1))) .groupByKey() .mapPartitions{ _.map { case(k, v) => (k, v.groupBy(_._1).map(pair => (pair._1, pair._2.map(_._2).sum)).toList) // Works as expected //case(k, v) => (k, reducer(v.toList)) // Fails } }.cache() 

And here is reducer:

def reducer(list: List[(Int, Int)]): List[(Int, Int)] = { list.groupBy(_._1).map( pair => (pair._1, pair._2.map(_._2).sum) ).toList } 

I also tried defining the reducer function as a val but I'm getting the same error. Actually, the error occurs in a Databricks notebook, in my machine running Spark in local mode it works fine.

Why does the commented case statement fail ? Do I have to pass anonymous functions always even if they are not as trivial as my reducer function ?

Thanks in advance :)

1 Answer 1

4

You don't say where reducer is defined, but it's likely to be in a non-serializable class (e.g. the one containing SparkContext etc.). Then using it requires capturing the instance it's called on. Define it in an object instead.

From the Spark Programming Guide:

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:

  • Anonymous function syntax, which can be used for short pieces of code.
  • Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:

    object MyFunctions { def func1(s: String): String = { ... } } myRdd.map(MyFunctions.func1) 

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method.

Sign up to request clarification or add additional context in comments.

1 Comment

Actually, the error occurs in a Databricks notebook. In my machine running Spark in local mode it works fine. I'm sorry. Forgot adding that little detail.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.