6

Looking for some info on using custom partitioner in Pyspark. I have a dataframe holding country data for various countries. So if I do repartition on country column, it will distribute my data into n partitions and keeping similar country data to specific partitions. This is creating a skew partition data when I see using glom() method.

Some countries like USA and CHN has huge amount of data in particular dataframe. I want to repartition my dataframe such that if the countries are USA and CHN then it will further split into some 10 partitions else keep the partitions same for other countries like IND, THA, AUS etc. Can we extend partitioner class in Pyspark code.

I have read this in below link that we can extend scala partitioner class in scala Spark application and can modify the partitioner class to use custom logic to repartition our data on base of requirements. Like the one I have.. please help to achieve this solution in Pyspark.. See the link below What is an efficient way to partition by column but maintain a fixed partition count?


I am using Spark version 2.3.0.2 and below is my Dataframe structure:

datadf= spark.sql(""" SELECT ID_NUMBER ,SENDER_NAME ,SENDER_ADDRESS ,REGION_CODE ,COUNTRY_CODE from udb.sometable """); 

The incoming data has data for six countries, like AUS, IND, THA, RUS, CHN and USA. CHN and USA has skew data.

so if I do repartition on COUNTRY_CODE, two partitions contains a lot data whereas others are fine. I checked this using glom() method.

newdf = datadf.repartition("COUNTRY_CODE") from pyspark.sql import SparkSession from pyspark.sql import HiveContext, DataFrameWriter, DataFrame newDF = datadf.repartitionByRange(3,"COUNTRY_CODE","USA") 

I was trying repartition my data into 3 more partitions for country USA and CHN only and would like to keep the other countries data into single partition.

This is what I am expecting AUS- one partition IND- one partition THA- one partition RUS- one partition CHN- three partition USA- three partition 

Traceback (most recent call last): File "", line 1, in File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 1182, in getattr "'%s' object has no attribute '%s'" % (self.class.name, name)) AttributeError: 'DataFrame' object has no attribute 'repartitionByRange'

3 Answers 3

9

There is no custom partitioner in Structured API, so in order to use custom partitioner, you'll need to drop down to RDD API. Simple 3 steps as follows:

  1. Convert Structured API to RDD API
dataRDD = dataDF.rdd 
  1. Apply custom partitioner in RDD API
import random # Extract key from Row object dataRDD = dataRDD.map(lambda r: (r[0], r)) def partitioner(key): if key == "CHN": return random.randint(1, 10) elif key == "USA": return random.randint(11, 20) else: # distinctCountryDict is a dict mapping distinct countries to distinct integers # these distinct integers should not overlap with range(1, 20) return distinctCountryDict[key] numPartitions = 100 dataRDD = dataRDD.partitionBy(numPartitions, partitioner) # Remove key extracted previously dataRDD = dataRDD.map(lambda r: r[1]) 
  1. Convert RDD API back to Structured API
dataDF = dataRDD.toDF() 

This way, you get the best of both worlds, Spark types and optimized physical plan in Structured API, as well as custom partitioner in low-level RDD API. And we only drop down to low-level API only when it's absolutely necessary.

Sign up to request clarification or add additional context in comments.

5 Comments

I think when we use partitionBy with rdd then key should be of int type.. otherwise it throws an error.. what you say?
@vikrantrana I'm not having this problem, could you post your error?
@ythdelmar- Thanks. I was performing similar thing on some rdd and found this thing.. I will check once again if it works with string datatype also. Please see link below for reference. stackoverflow.com/questions/47116294/…
@vikrantrana The thread also mentioned a potential problem, that you need to have a key-value pair RDD first in order to perform partitionBy, which is this line of my code from above dataRDD = dataRDD.map(lambda r: (r[0], r)). Perhaps you're getting your error from not having a key-value pair RDD?
@ythdelmar Does rdd.toDF() preserve partitioning ?
7

Try something like this with hashing:

newDf = oldDf.repartition(N, $"col1", $"coln") 

or for ranging approach:

newDF = oldDF.repartitionByRange(N, $"col1", $"coln") 

There is no custom partitioning for DF's just yet.

In your case I would go for hashing, but there are no guarantees.

But if your data is skew you may need some extra work, like 2 columns for partitioning being the simplest approach.

E.g. an existing or new column - in this case a column that applies a grouping against a given country, e.g. 1 .. N, and the partition on two cols.

For countries with many grouping you get N synthetic sub divisions; for others with low cardinality, only with 1 such group number. Not too hard. Both partitioning can take more than 1 col.

In my view uniform number filling of partitions takes a lot of effort and not really attainable, but a next best approach as in this here can suffice well enough. Amounts to custom partitioning to an extent.

Otherwise, using .withColumn on a DF you can simulate custom partitioning with those rules and filling of a new DF column and then apply the repartitionByRange. Also not so hard.

18 Comments

How does repartitionByrange function works? Can we use it in Pyspark to repartition the dataframe?
@Thanks for your help and suggestions. Could you please elaborate more on this last statement. (Otherwise, using .withColumn on a DF you can simulate custom partitioning with those rules and filling of a new DF column and then apply the repartitionByRange. Also not so hard.)
Partitioning & re-partitioning are big topics - not always well understood. Hashing can lead to skewness more easily, ranges less so. It depends on nature of data. Do you need all data in same partition or can live with stuff in different partitions? But if you want more balanced partititions in terms of numbers - even though there may be disbenefit - then using multiple cols can help awa use of range partitioning. A good article: 24tutorials.com/spark/deep-dive-into-partitioning-in-spark However, what I described I have done here and on other systems in the past. Success.
Yes that is the case
I cannot check as I only have 2.2 on spark-shell. databricks 2.3 works no issue. Exit for me. Pretty odd, wondering if it is a Databricks extension.
|
1

There is no direct way to apply user defined partitioner on PySpark, the short cut is to create a new column with a UDF, assigning each record with a partition ID based on the business logic. And use the new column for partitioning, that way the data gets spread evenly.

numPartitions= 3 df = df.withColumn("Hash#", udf_country_hash(df['Country'])) df = df.withColumn("Partition#", df["Hash#"] % numPartitions) df.repartition(numPartitions, "Partition#") 

Please check the online version of code @ https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8963851468310921/2231943684776180/5846184720595634/latest.html

In my experience converting DataFrame to RDD and back to DataFrame is a costly operation, better to avoid it.

1 Comment

This will create required partitions, yes. But it is not necessary that each partition is processed in a separate task.. you will notice this when you have a large# of partitions

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.