4

I have a large parquet dataset that I am reading with Spark. Once read, I filter for a subset of rows which are used in a number of functions that apply different transformations:

The following is similar but not exact logic to what I'm trying to accomplish:

df = spark.read.parquet(file) special_rows = df.filter(col('special') > 0) # Thinking about adding the following line special_rows.cache() def f1(df): new_df_1 = df.withColumn('foo', lit(0)) return new_df_1 def f2(df): new_df_2 = df.withColumn('foo', lit(1)) return new_df_2 new_df_1 = f1(special_rows) new_df_2 = f2(special_rows) output_df = new_df_1.union(new_df_2) output_df.write.parquet(location) 

Because a number of functions might be using this filtered subset of rows, I'd like to cache or persist it in order to potentially speed up execution speed / memory consumption. I understand that in the above example, there is no action called until my final write to parquet.

My questions is, do I need to insert some sort of call to count(), for example, in order to trigger the caching, or if Spark during that final write to parquet call will be able to see that this dataframe is being used in f1 and f2 and will cache the dataframe itself.

If yes, is this an idiomatic approach? Does this mean in production and large scale Spark jobs that rely on caching, random operations that force an action on the dataframe pre-emptively are frequently used, such as a call to count?

2 Answers 2

6

there is no action called until my final write to parquet.

and

Spark during that final write to parquet call will be able to see that this dataframe is being used in f1 and f2 and will cache the dataframe itself.

are correct. If you do output_df.explain(), you will see the query plan, which will show that what you said is correct.

Thus, there is no need to do special_rows.cache(). Generally, cache is only necessary if you intend to reuse the dataframe after forcing Spark to calculate something, e.g. after write or show. If you see yourself intentionally calling count(), you're probably doing something wrong.

Sign up to request clarification or add additional context in comments.

3 Comments

Thanks @mck! As a quick follow up, what if I wanted this subset dataframe to potentially persist to disk as well? Would I need to use a persist() call? I will review the query plan momentarily, but I am trying to pre-emptively solve for the case where a large Spark application will be caching a large number of sizable dataframes.
If you will save to parquet also new_df_1 and new_df_2, in that case you should use a cache as you said.
@mck - I am new to Spark and still a bit confused. Do you mean that even without the line special_rows.cache() in OP's code spark will realize during the write that special_rows is being used twice and will automatically cache it (so it won't be evaluated unnecessarily twice, just once), OR you mean that the line special_rows.cache() is needed but an immediate action call like count is not needed because there will be a write action later anyways?
1

You might want to repartition after running special_rows = df.filter(col('special') > 0). There can be a large number of empty partitions after running a filtering operation, as explained here.

The new_df_1 will make cache special_rows which will be reused by new_df_2 here new_df_1.union(new_df_2). That's not necessarily a performance optimization. Caching is expensive. I've seen caching slow down a lot of computations, even when it's being used in a textbook manner (i.e. caching a DataFrame that gets reused several times downstream).

Counting does not necessarily make sure the data is cached. Counts avoid scanning rows whenever possible. They'll use the Parquet metadata when they can, which means they don't cache all the data like you might expect.

You can also "cache" data by writing it to disk. Something like this:

df.filter(col('special') > 0).repartition(500).write.parquet("some_path") special_rows = spark.read.parquet("some_path") 

To summarize, yes, the DataFrame will be cached in this example, but it's not necessarily going to make your computation run any faster. It might be better to have no cache or to "cache" by writing data to disk.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.