4

Is there a reliable way to predict which Spark dataframe operations will preserve partitioning and which won't?

Specifically, let's say my dataframes are all partitioned with .repartition(500,'field1','field2'). Can I expect an output with 500 partitions arranged by these same fields if I apply:

  1. select()
  2. filter()
  3. groupBy() followed by agg() when grouping happens on 'field1' and 'field2' (as in the above)
  4. join() on 'field1' and 'field2' when both dataframes are partitioned as above

Given the special way my data is prepartitioned, I'd expect no extra shuffling to take place. However, I always seem to end up with at least few stages having number of tasks equal to spark.sql.shuffle.partitions. Any way to avoid that extra shuffling bit?

Thanks

1
  • 2
    select and filter will not cause a shuffle as these are narrow transformations. GroupBy will cause shuffle unless you have bucketing. Join will cause shuffle unless you do broadcast-join Commented Sep 15, 2018 at 9:13

1 Answer 1

3

This is an well known issue with Spark. Even if you have re-partitioned the data Spark will shuffle the data.

What is the Problem

The re-partition ensures each partition contains the data about a single column value.

Good example here:

val people = List( (10, "blue"), (13, "red"), (15, "blue"), (99, "red"), (67, "blue") ) val peopleDf = people.toDF("age", "color") colorDf = peopleDf.repartition($"color") 
Partition 00091 13,red 99,red Partition 00168 10,blue 15,blue 67,blue 

However Spark doesn't remember this information for subsequent operations. Also the total ordering of the partitions across different partitions are not kept in spark. i.e. Spark knows for a single partition it has data about one partition but doesn't know which other partitions have the data about the same column. Also the sorting is required in the data to ensure shuffle not required.

How can you solve

You need to use the Spark Bucketing feature

feature to ensure no shuffle in subsequent stages.

I found this Wiki is pretty detailed about the bucketing features.

Bucketing is an optimization technique in Spark SQL that uses buckets and bucketing columns to determine data partitioning.

The motivation is to optimize performance of a join query by avoiding shuffles (aka exchanges) of tables participating in the join. Bucketing results in fewer exchanges (and so stages).

Sign up to request clarification or add additional context in comments.

5 Comments

I thought that was about writing out and reading in. Fine, But what about if dong many transformations after reading in?
When you do many transformations back to back that would probably require a shuffle. Plus you have to ensure all the join/aggregations are on the bucketed columns. It is pretty hard and currently spark doesn't have a great support for bucket by. So probably you will end up having some shuffle
Cool, but I think that is the thrust of this question.
Not sure how the question relates to the answer
Hi and thanks for the answer. So, the only way to avoid shuffling for join and groupBy is to switch to table views and do bucketing while paying extra cost for sorting (that seems to be a precondition for bucketing)? In other words, if I only want to join/group my dataframe once, the effort is likely not worth it?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.