I've been struggling with org.apache.spark.SparkException: Task not serializable but finally figured out how to make this work:
case class Article(id: Int, title: String, content: String) extends Serializable val index: RDD[(String, List[(Int, Int)])] = (for { article <- articlesRDD text = article.title + article.content word <- text.split(" ") } yield (word, (article.id, 1))) .groupByKey() .mapPartitions{ _.map { case(k, v) => (k, v.groupBy(_._1).map(pair => (pair._1, pair._2.map(_._2).sum)).toList) // Works as expected //case(k, v) => (k, reducer(v.toList)) // Fails } }.cache() And here is reducer:
def reducer(list: List[(Int, Int)]): List[(Int, Int)] = { list.groupBy(_._1).map( pair => (pair._1, pair._2.map(_._2).sum) ).toList } I also tried defining the reducer function as a val but I'm getting the same error. Actually, the error occurs in a Databricks notebook, in my machine running Spark in local mode it works fine.
Why does the commented case statement fail ? Do I have to pass anonymous functions always even if they are not as trivial as my reducer function ?
Thanks in advance :)