3

I Have a scenario, where I have 2 tables one table with days and another table with values. So from the table that has days I need to sum the values of the another table same no of days. Dataframe

dataframe1 df1 = spark.createDataFrame( [ ('ll',5) ('yy',6) ], ('x','days') ) dataframe2 df = spark.createDataFrame( [ ('ll','2020-01-05','1','10','50'), ('ll','2020-01-06','1','10'), ('ll','2020-01-07','1','10'), ('ll','2020-01-08','1','10'), ('ll','2020-01-09','1','10'), ('ll','2020-01-10','1','10'), ('ll','2020-01-11','1','20'), ('ll','2020-01-12','1','10'), ('ll','2020-01-05','2','30'), ('ll','2020-01-06','2','30'), ('ll','2020-01-07','2','30'), ('ll','2020-01-08','2','40'), ('ll','2020-01-09','2','30'), ('ll','2020-01-10','2','10'), ('ll','2020-01-11','2','10'), ('ll','2020-01-12','2','10'), ('yy','2020-01-05','1','20'), ('yy','2020-01-06','1','20'), ('yy','2020-01-07','1','20'), ('yy','2020-01-08','1','20'), ('yy','2020-01-09','1','20'), ('yy','2020-01-10','1','40'), ('yy','2020-01-11','1','20'), ('yy','2020-01-12','1','20'), ('yy','2020-01-05','2','40'), ('yy','2020-01-06','2','40'), ('yy','2020-01-07','2','40'), ('yy','2020-01-08','2','40'), ('yy','2020-01-09','2','40'), ('yy','2020-01-10','2','40'), ('yy','2020-01-11','2','60'), ('yy','2020-01-12','2','40') ], ('x','date','flag','value') ) expected_dataframe = spark.createDataFrame( [ ('ll','2020-01-05','1','10','50'), ('ll','2020-01-06','1','10','50'), ('ll','2020-01-07','1','10','60'), ('ll','2020-01-08','1','10','60'), ('ll','2020-01-09','1','10','50'), ('ll','2020-01-10','1','10','40'), ('ll','2020-01-11','1','20','30'), ('ll','2020-01-12','1','10','10'), ('ll','2020-01-05','2','30','170'), ('ll','2020-01-06','2','30','140'), ('ll','2020-01-07','2','30','120'), ('ll','2020-01-08','2','40','100'), ('ll','2020-01-09','2','30','60'), ('ll','2020-01-10','2','10','30'), ('ll','2020-01-11','2','10','20'), ('ll','2020-01-12','2','10','10'), ('yy','2020-01-05','1','20','140'), ('yy','2020-01-06','1','20','140'), ('yy','2020-01-07','1','20','140'), ('yy','2020-01-08','1','20','120'), ('yy','2020-01-09','1','20','100'), ('yy','2020-01-10','1','40','80'), ('yy','2020-01-11','1','20','40'), ('yy','2020-01-12','1','20','20'), ('yy','2020-01-05','2','40','240'), ('yy','2020-01-06','2','40','260'), ('yy','2020-01-07','2','40','260'), ('yy','2020-01-08','2','40','220'), ('yy','2020-01-09','2','40','180'), ('yy','2020-01-10','2','40','140'), ('yy','2020-01-11','2','60','100'), ('yy','2020-01-12','2','40','40') ], ('x','date','flag','value','result') 

expected_results

 +---+----------+----+-----+------+ | x| date|flag|value|result| +---+----------+----+-----+------+ | ll|2020-01-05| 1| 10| 50| | ll|2020-01-06| 1| 10| 50| | ll|2020-01-07| 1| 10| 60| | ll|2020-01-08| 1| 10| 60| | ll|2020-01-09| 1| 10| 50| | ll|2020-01-10| 1| 10| 40| | ll|2020-01-11| 1| 20| 30| | ll|2020-01-12| 1| 10| 10| | ll|2020-01-05| 2| 30| 170| | ll|2020-01-06| 2| 30| 140| | ll|2020-01-07| 2| 30| 120| | ll|2020-01-08| 2| 40| 100| | ll|2020-01-09| 2| 30| 60| | ll|2020-01-10| 2| 10| 30| | ll|2020-01-11| 2| 10| 20| | ll|2020-01-12| 2| 10| 10| | yy|2020-01-05| 1| 20| 140| | yy|2020-01-06| 1| 20| 140| | yy|2020-01-07| 1| 20| 140| | yy|2020-01-08| 1| 20| 120| | yy|2020-01-09| 1| 20| 100| | yy|2020-01-10| 1| 40| 80| | yy|2020-01-11| 1| 20| 40| | yy|2020-01-12| 1| 20| 20| | yy|2020-01-05| 2| 40| 240| | yy|2020-01-06| 2| 40| 260| | yy|2020-01-07| 2| 40| 260| | yy|2020-01-08| 2| 40| 220| | yy|2020-01-09| 2| 40| 180| | yy|2020-01-10| 2| 40| 140| | yy|2020-01-11| 2| 60| 100| | yy|2020-01-12| 2| 40| 40| +---+----------+----+-----+------+ 

code

from pyspark.sql.window import Window from pyspark.sql.functions import * df_join = df.join(df1,['x'],'inner').withColumn('date',to_date(col('date'),'yyyy-MM-dd')) from pyspark.sql.window import Window w1 = Window.partitionBy('x','flag').orderBy(col['date'].desc()) 

So I need sum value column based on days column, i,e if days column is 5, I need to sum 5 rows of the values.

I had joined the two tables and using window function I tried to solve, but id didnt work out and not able figure out how to solve it. Can any show me the way how to solve it

0

1 Answer 1

2

First you could join on x, then create a row_number() over your rows, which will be used to single out wherever it is greater than days(turn them into nulls), then sum over a partitioned only window to broadcast your sum across all the rows.

from pyspark.sql import functions as F from pyspark.sql.window import Window w=Window().partitionBy("x","flag").orderBy(F.to_date("date","yyyy-dd-MM")) w1=Window().partitionBy("x","flag") df.join(df1, ['x'])\ .withColumn("rowNum", F.row_number().over(w))\ .withColumn("expected_result", F.sum(F.when(F.col("rowNum")>F.col("days")\ ,F.lit(None)).otherwise(F.col("value")))\ .over(w1)).drop("days","rowNum").show() #+---+----------+----+-----+---------------+ #| x| date|flag|value|expected_result| #+---+----------+----+-----+---------------+ #| ll|2020-01-05| 1| 10| 50.0| #| ll|2020-01-06| 1| 10| 50.0| #| ll|2020-01-07| 1| 10| 50.0| #| ll|2020-01-08| 1| 10| 50.0| #| ll|2020-01-09| 1| 10| 50.0| #| ll|2020-01-10| 1| 10| 50.0| #| ll|2020-01-11| 1| 10| 50.0| #| ll|2020-01-12| 1| 10| 50.0| #| ll|2020-01-05| 2| 30| 150.0| #| ll|2020-01-06| 2| 30| 150.0| #| ll|2020-01-07| 2| 30| 150.0| #| ll|2020-01-08| 2| 30| 150.0| #| ll|2020-01-09| 2| 30| 150.0| #| ll|2020-01-10| 2| 10| 150.0| #| ll|2020-01-11| 2| 10| 150.0| #| ll|2020-01-12| 2| 10| 150.0| #| yy|2020-01-05| 1| 20| 120.0| #| yy|2020-01-06| 1| 20| 120.0| #| yy|2020-01-07| 1| 20| 120.0| #| yy|2020-01-08| 1| 20| 120.0| #+---+----------+----+-----+---------------+ #only showing top 20 rows 

UPDATE:

For Spark2.4+, you could use higher order functions transform and aggregate, after collect_list. I assumed data to be ordered as in the example provided, if thats not the case, then an extra step needs to be added to ensure that.

from pyspark.sql import functions as F from pyspark.sql.window import Window w=Window().partitionBy("x","flag") w1=Window().partitionBy("x","flag").orderBy(F.to_date("date","yyyy-dd-MM")) df.join(df1,['x'])\ .withColumn("result", F.collect_list("value").over(w))\ .withColumn("rowNum", F.row_number().over(w1)-1)\ .withColumn("result", F.expr("""aggregate(transform(result,(x,i)->array(x,i)),0,(acc,x)-> \ IF((int(x[1])>=rowNum)and(int(x[1])<days+rowNum),int(x[0])+acc,acc))"""))\ .drop("flag","rowNum","days").show() #+---+----------+-----+------+ #| x| date|value|result| #+---+----------+-----+------+ #| ll|2020-01-05| 10| 50| #| ll|2020-01-06| 10| 50| #| ll|2020-01-07| 10| 60| #| ll|2020-01-08| 10| 60| #| ll|2020-01-09| 10| 50| #| ll|2020-01-10| 10| 40| #| ll|2020-01-11| 20| 30| #| ll|2020-01-12| 10| 10| #| ll|2020-01-05| 30| 160| #| ll|2020-01-06| 30| 140| #| ll|2020-01-07| 30| 120| #| ll|2020-01-08| 40| 100| #| ll|2020-01-09| 30| 60| #| ll|2020-01-10| 10| 30| #| ll|2020-01-11| 10| 20| #| ll|2020-01-12| 10| 10| #| yy|2020-01-05| 20| 140| #| yy|2020-01-06| 20| 140| #| yy|2020-01-07| 20| 140| #| yy|2020-01-08| 20| 120| #| yy|2020-01-09| 20| 100| #| yy|2020-01-10| 40| 80| #| yy|2020-01-11| 20| 40| #| yy|2020-01-12| 20| 20| #| yy|2020-01-05| 40| 240| #| yy|2020-01-06| 40| 260| #| yy|2020-01-07| 40| 260| #| yy|2020-01-08| 40| 220| #| yy|2020-01-09| 40| 180| #| yy|2020-01-10| 40| 140| #| yy|2020-01-11| 60| 100| #| yy|2020-01-12| 40| 40| #+---+----------+-----+------+ 

Also, in your example, row number 9 should be 160, instead of 170.

Sign up to request clarification or add additional context in comments.

5 Comments

one query, suppose say for flag 1, if I have only 4 rows but my days are 5. so if the days are met I need to sum, if days are not met I need sum the total rows in the flag.
modified the expected result
The new result column doesn’t make sense to me. First two rows are 50 and then next two rows 60, then later it’s incrementally decreasing. Could u elaborate on the formulation of that
@ Mohammad Murtaza Hashmi so let say if you have 6 days in flag1 section, and your days are 5. for the first row it sums up 5 rows then for 2nd row it sums up remaining 5 rows but when you go to 3rd row then you will have only 4 rows so we have to sum 4 rows. so ideally for each row it takes the next 5 dates values if you dont have 5 rows take what ever rows available .. I hope it is helpful
@Anji happy to help, please accept answer to close the thread

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.