4

I have searched a lot for a succinct answer, hopefully someone can help me with some clarity on databricks partitioning..

assume i have a data frame with columns: Year, Month, Day, SalesAmount, StoreNumber

I want to store this partitioned by Year, & Month.. so i can run the following command:

df.write.partitionBy('Year', 'Month').format('csv').save('/mnt/path/', header='true') 

This will output data in the format of: /path/Year=2019/Month=05/<file-0000x>.csv

If i then load it back again, such as:

spark.read.format('csv').options(header='true').load('/mnt/path/').createOrReplaceTempView("temp1") 

Q1: This has not yet actually 'read' the data yet, right? i.e. i could have billions of records.. but until i actually query temp1, nothing is executed against the source?

Q2-A: Subsequently, when querying this data using temp1, it is my assumption that if i include the items that were used in the partitioning in the where clause, a smart filtering on the actual files that are read off the disk will be applied?

%sql select * from temp1 where Year = 2019 and Month = 05 -- OPTIMAL 

whereas the following would not do any file filtering as it has no context of which partitions to look in:

%sql select * from temp1 where StoreNum = 152 and SalesAmount > 10000 -- SUB-OPTIMAL 

Q2-B: Finally, if i stored the files in parquet format (rather than *.csv).. would both of the queries above 'push down' in to the actual data stored.. but in perhaps different ways?

I.e. the first would still use the partitions, but the second (where StoreNum = 152 and SalesAmount > 10000) will now use columnar storage of parquet? While *.csv does not have that optimisation?

Can anyone please clarify my thinking / understanding around this?

links to resources would be great also..

2 Answers 2

4

A1: You are right about the evaluation of createOrReplaceTempView. This will be evaluated lazily for the current Spark session. In other word if you terminate Spark session without accessing it the data will never be transfered into temp1.

A2: Let's examine the case through an example using your code. First let's save your data with:

df.write.mode("overwrite").option("header", "true") .partitionBy("Year", "Month") .format("csv") .save("/tmp/partition_test1/") 

And then load it with:

val df1 = spark.read.option("header", "true") .csv("/tmp/partition_test1/") .where($"Year" === 2019 && $"Month" === 5) 

Executing df1.explain will return:

== Physical Plan == *(1) FileScan csv [Day#328,SalesAmount#329,StoreNumber#330,Year#331,Month#332] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 0, Partition Filters: [isnotnull(Year#331), isnotnull(Month#332), (Year#331 = 2019), (Month#332 = 5)], PushedFilters: [], ReadSchema: struct<Day:string,SalesAmount:string,StoreNumber:string> 

As you can see the PushedFilters: [] array is empty although the PartitionFilters[] is not, indicating that Spark was able to apply filtering on partitions and therefore pruning the partitions that do not satisfy the where statement.

If we slightly change the Spark query to:

df1.where($"StoreNumber" === 1 && $"Year" === 2011 && $"Month" === 11).explain == Physical Plan == *(1) Project [Day#462, SalesAmount#463, StoreNumber#464, Year#465, Month#466] +- *(1) Filter (isnotnull(StoreNumber#464) && (cast(StoreNumber#464 as int) = 1)) +- *(1) FileScan csv [Day#462,SalesAmount#463,StoreNumber#464,Year#465,Month#466] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 1, Par titionFilters: [isnotnull(Month#466), isnotnull(Year#465), (Year#465 = 2011), (Month#466 = 11)], PushedFilters: [IsNotNull(StoreNumber)], ReadSchema: struct<Day:string,SalesAmount:string,Store Number:string> 

Now both PartitionFilters and PushedFilters will take place minimizing Spark workload. As you can see Spark leverages both filters first by recognizing the existing partitions through PartitionFilters and then applying the predicate pushdown.

Exactly the same applies for parquet files with the big difference that parquet will utilize the predicate pushdown filters even more combining them with its internal columnar based system (as you already mentioned), which keeps metrics and stats over the data. So the difference with CSV files is that in the case of CSVs the predicate pushdown will take place when Spark is reading/scanning the CSV files excluding records that do not satisfy the predicate pushdown condition. When for parquet the predicate pushdown filter will be propagated to the parquet internal system resulting to even larger pruning of data.

In your case loading data from createOrReplaceTempView will not differ and the execution plan will remain the same.

Some useful links:

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

https://www.waitingforcode.com/apache-spark-sql/predicate-pushdown-spark-sql/read

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkStrategy-FileSourceStrategy.html

Sign up to request clarification or add additional context in comments.

8 Comments

fantastic answer, thanks for taking the time! Your last point re: createOrReplaceTempView is there a way in which perf will be improved with parquet Vs csv?
Hello @m1nkeh, I dont think so. Both Spark SQL API and Dataframes API will be interpreted into the same logical/execution plan. SQL API is for users that prefer to write SQL versus the dataframe syntax :)
ok, so if stays in dataframe syntax there is an optimization?
No problem at, I mentioned it because you were using it already and since you were using it I wanted to clarify that there is no difference in the execution plan
Hi there @m1nkeh in this particular case there is nothing to improve since is very simple schema and query :) but in many other cases yes you would like to make such improvements
|
1

Q1, when you read csv files without providing a schema then it has to infer the schema and a read happens immediately of all files (possibly it filter the partition at this point if it can). If you were to provide a schema your assumptions on filtering are correct as are the execution event assumptions.

Q2. Not sure I follow. When you say two queries do you mean above or below? On the below one does a write, how do you expect filtering to work on a write?

If you are referring to the first two queries in parquet then the first will eliminate most files and be very quick. The second will hopefully skip some data by using statistics on in the files to show that it doesn’t need to read them. But it will still touch every file.

You may find this useful https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example

1 Comment

thanks for the reply, i read the question again and can see it wasn't clear.. i've (hopefully) clarified now by expanding out q2 in to q2a, and q2b... but essentially i am simply looking for validation on the way *.csv and *.parquet are stored, and 'optimised' under the covers... i located that link yesterday actually, and gave it a good read, i will read it again! 🙂

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.