4

I am trying to define a udf in spark(2.0) from a string containing scala function definition.Here is the snippet:

val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe import universe._ import scala.reflect.runtime.currentMirror import scala.tools.reflect.ToolBox val toolbox = currentMirror.mkToolBox() val f = udf(toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int]) sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show 

This gives me an error :

 Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

However when I define the udf as :

val f = udf((s:String) => 5) 

it works just fine. What is the issue here?The end objective is to take a string which has the defn of a scala function and use it as a udf.

4
  • It seems like you are running into this problem - issues.apache.org/jira/browse/SPARK-9219 Commented Aug 11, 2016 at 21:31
  • @vsminkov It is not that. Commented Aug 11, 2016 at 21:59
  • 2
    Man, you could not have combined two more complex and ugly monsters than Spark and scala reflection. :) Commented Aug 12, 2016 at 7:54
  • 1
    Hazards of the profession! :) Commented Aug 16, 2016 at 7:06

2 Answers 2

6
+50

As Giovanny observed, the problem lies in the class loaders being different (you can investigate this more by calling .getClass.getClassLoader on whatever object). Then, when the workers try to deserialize your reflected function, all hell breaks loose.

Here is a solution that does not involve any class loader hackery. The idea is to move the reflection step to the workers. We'll end up having to redo the reflection step, but only once per worker. I think this is pretty optimal - even if you did the reflection only once on the master node, you would have to do a fair bit of work per worker to get them to recognize the function.

val f = udf { new Function1[String,Int] with Serializable { import scala.reflect.runtime.universe._ import scala.reflect.runtime.currentMirror import scala.tools.reflect.ToolBox lazy val toolbox = currentMirror.mkToolBox() lazy val func = { println("reflected function") // triggered at every worker toolbox.eval(toolbox.parse("(s:String) => 5")).asInstanceOf[String => Int] } def apply(s: String): Int = func(s) } } 

Then, calling sc.parallelize(Seq("1","5")).toDF.select(f(col("value"))).show works just fine.

Feel free to comment out the println - it is just an easy way of counting how many times the reflection happened. In spark-shell --master 'local' that's only once, but in spark-shell --master 'local[2]' it's twice.

How it works

The UDF gets evaluated immediately, but it never gets used until it reaches the worker nodes, so the lazy values toolbox and func only get evaluated on the workers. Furthermore, since they are lazy, they only ever get evaluated once per worker.

Sign up to request clarification or add additional context in comments.

1 Comment

is it possible to detect the types instead of specifying the String and Int Types , If possible then it handle any kind of UDF
3

I had the same error, and it doesn't show the ClassNotFoundException because the JavaDeserializationStream class is catching the exception, depending on your environment it is failing because it coudn't find the class you're trying to execute from your RDD/DataSet but it doesn't show the ClassNotFoundError . To fix this issue I had to generate a jar with all the classes on my project (including the function and dependencies ) and include the jar inside the spark environment

This for an standalone cluster

conf.setJars ( Array ("/fullpath/yourgeneratedjar.jar", "/fullpath/otherdependencies.jar") ) 

and this for a yarn cluster

conf.set("spark.yarn.jars", "/fullpath/yourgeneratedjar.jar,/fullpath/otherdependencies.jar") 

4 Comments

Tried adding org.scala-lang:scala-compiler:2.11.8 and org.scala-lang:scala-reflect:2.11.8 specifically to --packages list ; but still the error is the same.I anyhow include all my application dependencies as a list of maven co-ordinates before running the job.
@sourabh I think i found the issue, when you use reflection to generate the funcion, the function is only available to the local class loader, once is trying to deserialize the function it will throw a ClassNotFoundException because the function is not available for the worker's class loaders, check the generated classes when you use val f = udf((s:String) => 5) and you'll see a MyObject$$anonfunc$...class for the function. I suggest to generate the .class file using scala interpreter and dinamically generate a jar containing this class
@GiovannyGutierrez Can you move last comment to the answer and expand on it?
I think @alec made a better approach to solve the issue ussing the lazy initialization and it is not necessary to make further changes

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.