1

Using pyspark/Delta lakes on Databricks, I have the following scenario:

sdf = spark.read.format("delta").table("...") result = sdf.filter(...).groupBy(...).agg(...) analysis_1 = result.groupBy(...).count() # transformation performed here analysis_2 = result.groupBy(...).count() # transformation performed here 

As I understand Spark with Delta lakes, due to chained execution, result is not actually computed upon declaration, but rather when it is used.

However, in this example, it is used multiple times, and hence the most expensive transformation is computed multiple times.

Is it possible to force execution at some point in the code, e.g.

sdf = spark.read.format("delta").table("...") result = sdf.filter(...).groupBy(...).agg(...) result.force() # transformation performed here?? analysis_1 = result.groupBy(...).count() # quick smaller transformation?? analysis_2 = result.groupBy(...).count() # quick smaller transformation?? 
2

1 Answer 1

0

The question in my view is all over the place, or not clear. But if you are new to Spark then, this can be the case.

So:

For use of .force, see https://blog.knoldus.com/getting-lazy-with-scala/ .force will not work on a Dataset or Dataframe.

Is this anything to do with pyspark or Delta Lake approach? No, no.

analysis_1 = result.groupBy(...).count() # quick smaller transformation?? 
  • This is in fact an Action with Transformations preceding leading to shuffling most likely.

So, I think you mean as our esteemed pault states, the following:

  • .cache or .persist

You would need I suspect:

result.cache 

This would mean your 2nd Action analysis_2 would not need to recompute all the way back to source a shown here

(2) Spark Jobs Job 16 View(Stages: 3/3) Stage 43: 8/8 succeeded / total tasks Stage 44: 200/200 succeeded / total tasks Stage 45: 1/1 succeeded / total tasks Job 17 View(Stages: 2/2, 1 skipped) Stage 46: 0/8 succeeded / total tasks skipped Stage 47: 200/200 succeeded / total tasks Stage 48: 1/1 succeeded / total tasks 

With improvements that are made to Spark, shuffle partitions are kept resulting in skipped stages in some cases as well and in particular for RDDs. For dataframes, caching is required to get the skipped stages effect I observe.

Sign up to request clarification or add additional context in comments.

1 Comment

I am indeed new to Spark, so I'm trying to grasp the various concepts. It is, however, progressing :-) Thank you for the explanation. As @pault hinted, it was the concept of cache that I was looking for. Thank you all.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.