1

I have a pyspark Dataframe spark version < 2.4

Example dataframe:

column_1<Array> | column_2 <Array> | column_3 <Array> | join_columns ---------------------------------------------------------------------------------------------------------------------------------------- ["2345", "98576", "09857"] | null | ["9857"] | ["2345", "98576", "09857", "9857"] ---------------------------------------------------------------------------------------------------------------------------------------- null | ["87569", "9876"] | ["76586"] | ["87569", "9876","76586"] ---------------------------------------------------------------------------------------------------------------------------------------- ["08798","07564"] | ["12345","5768","89687","7564"] | ["7564"] | ["08798","07564","12345","5768","89687", "7564"] ---------------------------------------------------------------------------------------------------------------------------------------- ["03456", "09867"] | ["87586"] | [] | ["03456", "09867","87586"] ------------------------------------------------------------------------------------------------------------------------------------------ 

I would like to combine the 3 columns column_1, column_2 and column_3 in one "join_columns" and to drop the duplicates values. I used concat, it combined the 3 columns but only when I have only one value in the column, because may be "concat" is working only on Strings

df.withColumn("join_columns", concat(df.s, df.d)).drop_duplicates() 

How can I combine the values of array columns ? Thank you

3 Answers 3

1

Before Spark 2.4, you can use a udf:

from pyspark.sql.functions import udf @udf('array<string>') def array_union(*arr): return list(set([e.lstrip('0').zfill(5) for a in arr if isinstance(a, list) for e in a])) df.withColumn('join_columns', array_union('column_1','column_2','column_3')).show(truncate=False) 

Note: we use e.lstrip('0').zfill(5) so that for each array item, we first remove the leading 0 and then fill 0s to left if the length of string is less than 5.

Sign up to request clarification or add additional context in comments.

5 Comments

do you have a suggestion about this issue stackoverflow.com/questions/59055915/…
@verojoucla, looks fine to me. you can also use a list comprehension: df.selectExpr([ 'if({0} = array(""), null, {0}) AS {0}'.format(c) for c in df.columns]). if the array is actual EMPTY, just change array("") to array().
but the proposed solution in the question link is not working.
very good :) ok I have a new challenge here ;) stackoverflow.com/questions/59104192/…
@verojoucla, added an answer, let me know if it' works.
1

In Spark 2.4 you can combine these 3 columns and then use the flatten function:

df.withColumn("join_columns", flatten(array("column1", "column2", "column2"))) 

In earlier spark versions you can make a UDF to do this flatten:

from pyspark.sql.functions import udf flatten = udf(lambda arr: str(arr).replace("[", "").replace("]", "").split(","), ArrayType()) df.withColumn("join_columns", flatten(array("column1", "column2", "column2"))) 

2 Comments

I got this error in the udf function: TypeError: __init__() takes at least 2 arguments (1 given)
I just copy / paste your code, and I replaced the columns and df names.
1

Can you try using solution below, spark 2.4

import pyspark.sql.functions as F df = df.withColumn('col12', F.array_union(df.column_1, df.column_2)) df = df.withColumn('join_columns_dup', F.array_union(df.col12, df.column_3)) df = df.withColumn('join_columns', F.array_distinct(df.join_columns_dup)) 

With Spark < 2.4, you can use

def array_concat(c1, c2, c3): return list(set((list() if c1 is None else c1) + (list() if c2 is None else c2) + (list() if c3 is None else c3))) arrayConcatUdf = F.udf(array_concat, Types.ArrayType(Types.StringType())) df = df.withColumn('join_columns', arrayConcatUdf(df.c1, df.c2, df.c3)) 

Crud but works fine with null value as well

3 Comments

Thanks for your answer, but I'm not using spark 2.4, I already mentioned in my question above.
I remarked when one of the columns is "null" it do not the join, but when the column is empty [] it do the join. How can test on the value of the column if is "null" ?
F.col('colName').isNull()

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.