0

I'm creating a DF by reading a csv file in Pyspark and then converting into RDD to apply UDF. It throws an error while applying the UDF.

Here's my code snippet -

# My UDF definition def my_udf(string_array): // some code // return float_var spark.udf.register("my_udf", my_udf, FloatType()) #Read from csv file read_data=spark.read.format("csv").load("/path/to/file/part-*.csv", header="true") rdd = read_data.rdd get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(["col1", "col2","col3"]) 

Sample data in read_data DF -

[Row(Id='ABCD505936', some_string='XCOYNZGAE', array='[0, 2, 5, 6, 8, 10, 12, 13, 14, 15]')] 

The schema of the DF created by reading from CSV file -

print (read_data.schema) StructType(List(StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true))) 

I get following error while applying UDF at the get_df line -

Traceback (most recent call last): File "", line 1, in File "/usr/lib/spark/python/pyspark/sql/session.py", line 58, in toDF return sparkSession.createDataFrame(self, schema, sampleRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 746, in createDataFrame rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) File "/usr/lib/spark/python/pyspark/sql/session.py", line 390, in _createFromRDD struct = self._inferSchema(rdd, samplingRatio, names=schema) File "/usr/lib/spark/python/pyspark/sql/session.py", line 377, in _inferSchema raise ValueError("Some of types cannot be determined by the " ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

Can anyone please help me in passing the array (datatype is string) to the UDF?

5
  • Why are you using an RDD instead of the dataframe? Commented Feb 21, 2020 at 15:40
  • Hi @cricket_007, I admit that I'm new to Pyspark and somebody advised me that when applying UDF we should convert DF to RDD so that we can use map which make the execution parallel instead of serial when using DF. Please correct me if this understanding is not correct, it'll be really helpful for me. Commented Feb 21, 2020 at 20:36
  • UDFs are a SparkSQL concept only to be used in DataFrames. In RDD terms, they are just methods. Commented Feb 21, 2020 at 20:40
  • Hi @cricket_007, So there is nothing like parallel execution of the UDF (method) with "map" function of RDD. Commented Feb 21, 2020 at 20:58
  • map() is a method of both dataframes and RDDs. Both operate in parallel. In order to use a UDF, you would go through the SparkSQL API - docs.databricks.com/spark/latest/spark-sql/udf-python.html Commented Feb 21, 2020 at 23:00

1 Answer 1

1

Two things:

  1. if convert DF to RDD you don't need to register my_udf as a udf. If you register udf, you directly apply to df like read_data.withColumn("col3", my_udf(F.col("col3")))

  2. the problem you encountered is at toDF step, that you dont specify the schema of the new DF when converted from RDD and spark is trying to infer type from sample data, but in your case, the implicit type hint is not working. I will manually create the schema and pass into the toDF like this

from pyspark.sql.types import StringType, FloatType, StructField, StructType 
get_schema = StructType( [StructField('col1', StringType(), True), StructField('col2', StringType(), True) StructField('col3', FloatType(), True)] ) get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(get_schema) 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.