0

I have a function that joins a list of dataframes to a base dataframe and returns a dataframe. I am trying to reduce the time this operation takes. Since I was joining multiple times using the base dataframe, I cached it but the runtime is still similar. This is the function I am using it

def merge_dataframes(base_df, df_list, id_col): """ Joins multiple dataframes using an identifier variable common across datasets :param base_df: everything will be added to this dataframe :param df_list: dfs that have to be joined to main dataset :param id_col: the identifier column :return: dataset with all joins """ base_df.persist(StorageLevel.MEMORY_AND_DISK) for each_df in df_list: base_df = base_df.join(each_df, id_col) base_df.unpersist() return base_df 

I was surprised to get similar results after caching. Whats the reason behind this and what can I do to make this consume less time.

Also since the datasets I am using currently are relatively small (~50k records) so I don't have an issue with caching datasets as and when needed as long as I decache them.

1 Answer 1

2

Join is a transformation - no calculation is triggered at this point

First:

You unpersist() it before the action.

Try remove unpersist and see what happens.

Second:

I'm afraid that in your case you can't benifit from persistance, because thing that is written in your code is same as:

base_df.join(df1, id_col).join(df2, id_col).join(df3, id_col)... 

In that case base_df is calculated only once and later only result of base_df.join() is used further. That means base_df is not reused.

Here is example where it would be reused:

base_df.join(df1, id_col) base_df.join(df2, id_col) 

But that does not fit your requirements. Depending on how base_df and list_dfs - how they are created you might want to consider pre-partition these dataframes with same partitioner - in that case join operation will not cause shuffle, that will greatly improve performance.

Another way is to perform broadcast join if dataframes in list_dfs are relatively small.

Sign up to request clarification or add additional context in comments.

2 Comments

The dataframes in df_list are all of same size. Their orders are different so I have to join them by the key. About the unpersist suggestion, I actually made a few more changes along with that which has reduced runtime but I'm not sure where to attribute those speed changes. But your suggestion led me to those few things I changed. Thanks
I think you could share them in providing your own answer, so for other people to see :)

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.