22

I need to pass a list into a UDF, the list will determine the score/category of the distance. For now, I am hard coding all distances to be the 4th score.

a= spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"]) from pyspark.sql.functions import udf def cate(label, feature_list): if feature_list == 0: return label[4] label_list = ["Great", "Good", "OK", "Please Move", "Dead"] udf_score=udf(cate, StringType()) a.withColumn("category", udf_score(label_list,a["distances"])).show(10) 

when I try something like this, I get this error.

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace: py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) 

3 Answers 3

45
from pyspark.sql.functions import udf, col #sample data a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"]) label_list = ["Great", "Good", "OK", "Please Move", "Dead"] def cate(label, feature_list): if feature_list == 0: return label[4] else: #you may need to add 'else' condition as well otherwise 'null' will be added in this case return 'I am not sure!' def udf_score(label_list): return udf(lambda l: cate(l, label_list)) a.withColumn("category", udf_score(label_list)(col("distances"))).show() 

Output is:

+------+---------+--------------+ |Letter|distances| category| +------+---------+--------------+ | A| 20|I am not sure!| | B| 30|I am not sure!| | D| 80|I am not sure!| +------+---------+--------------+ 
Sign up to request clarification or add additional context in comments.

3 Comments

People say we can use pyspark.sql.functions.array() to directly pass a list to an UDF (from Spark 2.20 on wards). How can I rewrite the above example using array().
something is wrong with your solution. It is always going to else part only.
Should be if len(feature_list) == 0?
19

Try currying the function, so that the only argument in the DataFrame call is the name of the column on which you want the function to act:

udf_score=udf(lambda x: cate(label_list,x), StringType()) a.withColumn("category", udf_score("distances")).show(10) 

Comments

5

I think this may help by passing list as a default value of a variable

from pyspark.sql.functions import udf, col #sample data a= sqlContext.createDataFrame([("A", 20), ("B", 30), ("D", 80),("E",0)],["Letter", "distances"]) label_list = ["Great", "Good", "OK", "Please Move", "Dead"] #Passing List as Default value to a variable def cate( feature_list,label=label_list): if feature_list == 0: return label[4] else: #you may need to add 'else' condition as well otherwise 'null' will be added in this case return 'I am not sure!' udfcate = udf(cate, StringType()) a.withColumn("category", udfcate("distances")).show() 

Output:

+------+---------+--------------+ |Letter|distances| category| +------+---------+--------------+ | A| 20|I am not sure!| | B| 30|I am not sure!| | D| 80|I am not sure!| | E| 0| Dead| +------+---------+--------------+ 

2 Comments

Sorry for down voting; I feel question is more towards how to send both arguments to the function for sure, rather than use one argument by default always. In cases where label_list is not defined in global scope and if you need to send that list dynamically, this solution doesn't cope up. ags29 and @Prem answered it precisely. Even I was looking for a similar solution.
Amith.. the list is static in the example so it is a fair possible solution for a static list.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.