1

I'm trying to merge three RDD's based on the same key. The following is the data.

+------+---------+-----+ |UserID|UserLabel|Total| +------+---------+-----+ | 2| Panda| 15| | 3| Candy| 15| | 1| Bahroze| 15| +------+---------+-----+ +------+---------+-----+ |UserID|UserLabel|Total| +------+---------+-----+ | 2| Panda| 7342| | 3| Candy| 5669| | 1| Bahroze| 8361| +------+---------+-----+ +------+---------+-----+ |UserID|UserLabel|Total| +------+---------+-----+ | 2| Panda| 37| | 3| Candy| 27| | 1| Bahroze| 39| +------+---------+-----+ 

I'm able to merge these three DF. I converted them to RDD dict with the following code for all three

new_rdd = userTotalVisits.rdd.map(lambda row: row.asDict(True)) 

After RDD conversion, I'm taking one RDD and the other two as lists. Mapping the first RDD and then adding other keys to it based on the same UserID. I was hoping there was a better way of doing this using pyspark. Here's the code I've written.

def transform(row): # Add a new key to each row for x in conversion_list: # first rdd in list of object as[{}] after using collect() if( x['UserID'] == row['UserID'] ): row["Total"] = { "Visitors": row["Total"], "Conversions": x["Total"] } for y in Revenue_list: # second rdd in list of object as[{}] after using collect() if( y['UserID'] == row['UserID'] ): row["Total"]["Revenue"] = y["Total"] return row potato = new_rdd.map(lambda row: transform(row)) #first rdd 

How should I efficiently merge these three RDDs/DFs? (because I had to perform three different task on a huge DF). Looking for a better efficient idea. PS I'm still spark newbie. The result of my code does is as follows which is what I need.

{'UserID': '2', 'UserLabel': 'Panda', 'Total': {'Visitors': 37, 'Conversions': 15, 'Revenue': 7342}} {'UserID': '3', 'UserLabel': 'Candy', 'Total': {'Visitors': 27, 'Conversions': 15, 'Revenue': 5669}} {'UserID': '1', 'UserLabel': 'Bahroze', 'Total': {'Visitors': 39, 'Conversions': 15, 'Revenue': 8361}} 

Thank you.

2 Answers 2

2

You can join the 3 dataframes on columns ["UserID", "UserLabel"], create a new struct total from the 3 total columns:

from pyspark.sql import functions as F result = df1.alias("conv") \ .join(df2.alias("rev"), ["UserID", "UserLabel"], "left") \ .join(df3.alias("visit"), ["UserID", "UserLabel"], "left") \ .select( F.col("UserID"), F.col("UserLabel"), F.struct( F.col("conv.Total").alias("Conversions"), F.col("rev.Total").alias("Revenue"), F.col("visit.Total").alias("Visitors") ).alias("Total") ) # write into json file result.write.json("output") # print result: for i in result.toJSON().collect(): print(i) # {"UserID":3,"UserLabel":"Candy","Total":{"Conversions":15,"Revenue":5669,"Visitors":27}} # {"UserID":1,"UserLabel":"Bahroze","Total":{"Conversions":15,"Revenue":8361,"Visitors":39}} # {"UserID":2,"UserLabel":"Panda","Total":{"Conversions":15,"Revenue":7342,"Visitors":37}} 
Sign up to request clarification or add additional context in comments.

Comments

1

You can just do the left joins on all the three dataframes but make sure the first dataframe that you use has all the UserID and UserLabel Values. You can ignore the GroupBy operation as suggested by @blackbishop and still it would give you the required output

I am showing how it can be done in scala but you could do something similar in python.

//source data val visitorDF = Seq((2,"Panda",15),(3,"Candy",15),(1,"Bahroze",15),(4,"Test",25)).toDF("UserID","UserLabel","Total") val conversionsDF = Seq((2,"Panda",37),(3,"Candy",27),(1,"Bahroze",39)).toDF("UserID","UserLabel","Total") val revenueDF = Seq((2,"Panda",7342),(3,"Candy",5669),(1,"Bahroze",8361)).toDF("UserID","UserLabel","Total") import org.apache.spark.sql.functions._ val finalDF = visitorDF.as("v").join(conversionsDF.as("c"),Seq("UserID","UserLabel"),"left") .join(revenueDF.as("r"),Seq("UserID","UserLabel"),"left") .withColumn("TotalArray",struct($"v.Total".as("Visitor"),$"c.Total".as("Conversions"),$"r.Total".as("Revenue"))) .drop("Total") display(finalDF) 

You can see the output as below :

enter image description here

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.