1

I am trying to read a pyspark dataframe with json column on databricks.

The dataframe:

 year month json_col 2010 09 [{"p_id":"vfdvtbe"}, {"p_id":"cdscs"}, {"p_id":"usdvwq"}] 2010 09 [{"p_id":"ujhbe"}, {"p_id":"cdscs"}, {"p_id":"yjev"}] 2007 10 [{"p_id":"ukerge"}, {"p_id":"ikrtw"}, {"p_id":"ikwca"}] 2007 10 [{"p_id":"unvwq"}, {"p_id":"cqwcq"}, {"p_id":"ikwca"}] 

I need a new dataframe with all duplicated "p_id" are removed and aggregated by year and month

 year month p_id (string) 2010 09 ["vfdvtbe", "cdscs", "usdvwq", "ujhbe", "yjev"] 2007 10 ["ukerge", "ikrtw", "ikwca", "unvwq", "cqwcq"] 

the new column "p_id" is a string of array. I would like to count what distinct "p_id"s are and how many of them in each year and month. And, also remove the duplicated elements that appear in the same year and month.

My code :

from pyspark.sql.types import * from pyspark.sql.functions import * schema = ArrayType(StructType( [ StructField('p_id', StringType(), True) ] )) schema = ArrayType(MapType(StringType(),StringType())) t = ff.withColumn("data",F.explode(F.from_json(F.col("json_col"),schema))).withColumn("data",F.when(F.col("data")["product_id"].cast("string").isNotNull(),F.col("data")["product_id"])).filter(F.col("data").isNotNull()).drop("json_col") display(t) 

I am not sure this can remove duplicates ?

thanks

1 Answer 1

2

Use flatten, array_distinct with groupBy, collect_list functions for this case.

Example:

df.show(10,False) #+----+-----+---------------------------------------------------------+ #|year|month|json_col | #+----+-----+---------------------------------------------------------+ #|2010|09 |[{"p_id":"vfdvtbe"}, {"p_id":"cdscs"}, {"p_id":"usdvwq"}]| #|2010|09 |[{"p_id":"ujhbe"}, {"p_id":"cdscs"}, {"p_id":"yjev"}] | #|2007|10 |[{"p_id":"ukerge"}, {"p_id":"ikrtw"}, {"p_id":"ikwca"}] | #|2007|10 |[{"p_id":"unvwq"}, {"p_id":"cqwcq"}, {"p_id":"ikwca"}] | #+----+-----+---------------------------------------------------------+ from pyspark.sql.types import * from pyspark.sql.functions import * schema = ArrayType(StructType( [ StructField('p_id', StringType(), True) ] )) df1=df.withColumn("ff",from_json(col("json_col"),schema)).\ select("year","month",expr('transform(ff , f -> f.p_id)').alias("tmp")) df1.groupBy("year","month").\ agg(to_json(array_distinct(flatten(collect_list(col("tmp"))))).alias("p_id")).\ show(10,False) #+----+-----+-------------------------------------------+ #|year|month|p_id | #+----+-----+-------------------------------------------+ #|2010|09 |["vfdvtbe","cdscs","usdvwq","ujhbe","yjev"]| #|2007|10 |["ukerge","ikrtw","ikwca","unvwq","cqwcq"] | #+----+-----+-------------------------------------------+ 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.