I am using spark version 2.3 and working on some poc wherein, I have to load some bunch of csv files to spark dataframe.
Considering below csv as a sample which I need to parse and load it into dataframe. The given csv has multiple bad records which needs to be identified.
id,name,age,loaded_date,sex 1,ABC,32,2019-09-11,M 2,,33,2019-09-11,M 3,XYZ,35,2019-08-11,M 4,PQR,32,2019-30-10,M #invalid date 5,EFG,32, #missing other column details 6,DEF,32,2019/09/11,M #invalid date format 7,XYZ,32,2017-01-01,9 #last column has to be character only 8,KLM,XX,2017-01-01,F 9,ABC,3.2,2019-10-10,M #decimal value for integer data type 10,ABC,32,2019-02-29,M #invalid date It would have been an easy task, if I have to parse it using python or pandas functions.
This's how I defined schema for this.
from pyspark.sql.types import* schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True), StructField("loaded_date", DateType(), True), StructField("sex", StringType(), True), StructField("corrupt_record",StringType(), True)]) df=spark.read.format("com.databricks.spark.csv") \ .option("header", "true") \ .option("dateFormat", "yyyy-MM-dd") \ .option("nanValue","0") \ .option("nullValue"," ") \ .option("treatEmptyValuesAsNulls","false") \ .option("columnNameOfCorruptRecord", "corrupt_record") \ .schema(schema).load(file) >>> df.show(truncate=False) +----+----+----+-----------+----+----------------------+ |id |name|age |loaded_date|sex |corrupt_record | +----+----+----+-----------+----+----------------------+ |1 |ABC |32 |2019-09-11 |M |null | |2 |null|33 |2019-09-11 |M |null | |3 |XYZ |35 |2019-08-11 |M |null | |4 |PQR |32 |2021-06-10 |M |null | |5 |EFG |32 |null |null|5,EFG,32, | |null|null|null|null |null|6,DEF,32,2019/09/11,M | |7 |XYZ |32 |2017-01-01 |9 |null | |null|null|null|null |null|8,KLM,XX,2017-01-01,F | |null|null|null|null |null|9,ABC,3.2,2019-10-10,M| |10 |ABC |32 |2019-03-01 |M |null | +----+----+----+-----------+----+----------------------+ Above code has parsed many records as expected but has failed to check on invalid dates. see record '4' & '10'. It has converted to some junk dates.
I can load dates as string type and create some udf or use cast to parse it correctly and to see whether a date entered is valid or not. Is there any way to check invalid date in first place without using a custom udf or later in the code.
Also, I was looking a some way to handle record '7' which is having a numeric value for last column.