I have two pyspark dataframes:
one is:
name start end bob 1 3 john 5 8 and second is:
day outcome 1 a 2 c 3 d 4 a 5 e 6 c 7 u 8 l And i need concat days result for every person, like
bob acd john ecul Is it possible to do so in pyspark?
df = df1.join(df2, on=(df1.start <= df2.day) & (df1.end >= df2.day)) df = df.groupBy('name').agg(F.collect_list(F.struct('day', 'outcome')).alias('outcome')) df = df.withColumn('outcome', F.transform(F.array_sort('outcome'), lambda x: x.outcome)) df = df.withColumn('outcome', F.array_join('outcome', '')) Use a non-equi join method to join the dataframe df1 with df2
+----+-----+---+---+-------+ |name|start|end|day|outcome| +----+-----+---+---+-------+ |bob |1 |3 |1 |a | |bob |1 |3 |2 |c | |bob |1 |3 |3 |d | |john|5 |8 |5 |e | |john|5 |8 |6 |c | |john|5 |8 |7 |u | |john|5 |8 |8 |l | +----+-----+---+---+-------+ Group the dataframe by name and collect the pairs of day and outcome. This step is crucial because the day column is required to maintain the order of concatenation
+----+--------------------------------+ |name|outcome | +----+--------------------------------+ |john|[{5, e}, {6, c}, {7, u}, {8, l}]| |bob |[{1, a}, {2, c}, {3, d}] | +----+--------------------------------+ Sort the array of pairs and transform to extract the outcome values
+----+------------+ |name|outcome | +----+------------+ |john|[e, c, u, l]| |bob |[a, c, d] | +----+------------+ Join the array to get the result
+----+-------+ |name|outcome| +----+-------+ |john|ecul | |bob |acd | +----+-------+ sequence if you do conditional join. df1.join(df2, on=(df1.start <= df2.day) & (df1.end >= df2.day))Use spark-sql. I used scala, but SQL in pyspark is exactly the same and I believe you can easily transform if there is any difference for pyspark.
Join two dataframes, use collect_list() to get array of outcome, then use concat_ws() to concatenate array to string:
val dF1 = Seq( ("bob", 1, 3), ("john", 5, 8) ).toDF("name","start","end") dF1.createOrReplaceTempView("dF1") val dF2 = Seq( (1, "a"), (2, "c"), (3, "d"), (4, "a"), (5, "e"), (6, "c"), (7, "u"), (8, "l") ).toDF("day","outcome") dF2.createOrReplaceTempView("dF2") spark.sql(""" select d1.name, concat_ws('',collect_list(d2.outcome)) outcome from (select d1.name, e.day from dF1 d1 lateral view explode(sequence(d1.start, d1.end)) e as day )d1 left join dF2 d2 on d1.day=d2.day group by d1.name """).show(100, false) Result:
+----+-------+ |name|outcome| +----+-------+ |bob |acd | |john|ecul | +----+-------+ Fixing OOM:
spark.sql(""" select d1.name, concat_ws('',collect_list(d2.outcome)) outcome from dF1 d1 left join dF2 d2 on d1.start<=d2.day and d1.end>=d2.day group by d1.name """).show(100, false)