5

I have two pyspark dataframes:

one is:

name start end bob 1 3 john 5 8 

and second is:

day outcome 1 a 2 c 3 d 4 a 5 e 6 c 7 u 8 l 

And i need concat days result for every person, like

bob acd john ecul 

Is it possible to do so in pyspark?

2 Answers 2

6

Code

df = df1.join(df2, on=(df1.start <= df2.day) & (df1.end >= df2.day)) df = df.groupBy('name').agg(F.collect_list(F.struct('day', 'outcome')).alias('outcome')) df = df.withColumn('outcome', F.transform(F.array_sort('outcome'), lambda x: x.outcome)) df = df.withColumn('outcome', F.array_join('outcome', '')) 

How this works?

Use a non-equi join method to join the dataframe df1 with df2

+----+-----+---+---+-------+ |name|start|end|day|outcome| +----+-----+---+---+-------+ |bob |1 |3 |1 |a | |bob |1 |3 |2 |c | |bob |1 |3 |3 |d | |john|5 |8 |5 |e | |john|5 |8 |6 |c | |john|5 |8 |7 |u | |john|5 |8 |8 |l | +----+-----+---+---+-------+ 

Group the dataframe by name and collect the pairs of day and outcome. This step is crucial because the day column is required to maintain the order of concatenation

+----+--------------------------------+ |name|outcome | +----+--------------------------------+ |john|[{5, e}, {6, c}, {7, u}, {8, l}]| |bob |[{1, a}, {2, c}, {3, d}] | +----+--------------------------------+ 

Sort the array of pairs and transform to extract the outcome values

+----+------------+ |name|outcome | +----+------------+ |john|[e, c, u, l]| |bob |[a, c, d] | +----+------------+ 

Join the array to get the result

+----+-------+ |name|outcome| +----+-------+ |john|ecul | |bob |acd | +----+-------+ 
Sign up to request clarification or add additional context in comments.

3 Comments

I got a pretty similar to this answer :) except I think you don't need to create sequence if you do conditional join. df1.join(df2, on=(df1.start <= df2.day) & (df1.end >= df2.day))
@Emma Agree, Good point :-)
Thanks a lot, your answer is perfectly right and correct, i can check only one answer tho, so checking the earliest
3

Use spark-sql. I used scala, but SQL in pyspark is exactly the same and I believe you can easily transform if there is any difference for pyspark.

Join two dataframes, use collect_list() to get array of outcome, then use concat_ws() to concatenate array to string:

val dF1 = Seq( ("bob", 1, 3), ("john", 5, 8) ).toDF("name","start","end") dF1.createOrReplaceTempView("dF1") val dF2 = Seq( (1, "a"), (2, "c"), (3, "d"), (4, "a"), (5, "e"), (6, "c"), (7, "u"), (8, "l") ).toDF("day","outcome") dF2.createOrReplaceTempView("dF2") spark.sql(""" select d1.name, concat_ws('',collect_list(d2.outcome)) outcome from (select d1.name, e.day from dF1 d1 lateral view explode(sequence(d1.start, d1.end)) e as day )d1 left join dF2 d2 on d1.day=d2.day group by d1.name """).show(100, false) 

Result:

+----+-------+ |name|outcome| +----+-------+ |bob |acd | |john|ecul | +----+-------+ 

Fixing OOM:

spark.sql(""" select d1.name, concat_ws('',collect_list(d2.outcome)) outcome from dF1 d1 left join dF2 d2 on d1.start<=d2.day and d1.end>=d2.day group by d1.name """).show(100, false) 

13 Comments

Thanks so much! It works, i run out of memory tho, so will do all this stuff in another way. My second df is about 200Mb
@user453575457 If it is too big, then try second query, it can work directly wo sequece+explode
Your answer is perfectly correct, but spark has a very poor performance here, i will do that in another way. 100 entries - 10kb in first df, 7Mb in second df - lasts for 40 seconds. With 1000 enties - i gave up waiting after 10 min. And i have 900k entries in first frame, so no chance it will ever be complete.
Read answers here: stackoverflow.com/a/55964221/2700344 You need to set spark.executor.memory, spark.driver.memory. Start with 2G and increase if necessary. Also you can set driver and executor cores and the number of executors.
Worked like a magic! now 1k finished in 5 min, and even better - i understand now how to spread this stuff on more nodes. Thanks a lot!
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.