5

I read that RDDs with the same partitioner will be co-located. This is important to me because I want to join several large Hive tables that are not partitioned. My theory is that if I can get them partitioned (by a field call date_day) and co-located then I would avoid shuffling .

Here is what I am trying to do for each table:

def date_day_partitioner(key): return (key.date_day - datetime.date(2017,05,01)).days df = sqlContext.sql("select * from hive.table") rdd = df.rdd rdd2 = rdd.partitionBy(100, date_day_partitioner) df2 = sqlContext.createDataFrame(rdd2, df_log_entry.schema) print df2.count() 

Unfortunately, I can't even test my theory about co-location and avoiding shuffling, because I get the following error when I try partitionBy: ValueError: too many values to unpack

Traceback (most recent call last): File "/tmp/zeppelin_pyspark-118755547579363441.py", line 346, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-118755547579363441.py", line 339, in <module> exec(code) File "<stdin>", line 15, in <module> File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 380, in count return int(self._jdf.count()) File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o115.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 6.0 failed 4 times, most recent failure: Lost task 21.3 in stage 6.0 (TID 182, ip-172-31-49-209.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/worker.py", line 174, in main process() File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/lib/spark/python/pyspark/rdd.py", line 1752, in add_shuffle_key ValueError: too many values to unpack at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) ... 

I must be doing something wrong, could you please help?

0

1 Answer 1

4

It's happening because you are not applying partitionBy on key-value pair rdd. Your rdd must be in key-value pair. Also, your key type should be integer. I don't have sample data for your hive table. So let's demonstrate the fact using below hive table:

I have created a below dataframe using hive table :

df = spark.table("udb.emp_details_table"); +------+--------+--------+----------------+ |emp_id|emp_name|emp_dept|emp_joining_date| +------+--------+--------+----------------+ | 1| AAA| HR| 2018-12-06| | 1| BBB| HR| 2017-10-26| | 2| XXX| ADMIN| 2018-10-22| | 2| YYY| ADMIN| 2015-10-19| | 2| ZZZ| IT| 2018-05-14| | 3| GGG| HR| 2018-06-30| +------+--------+--------+----------------+ 

Now, I wish to partition my dataframe and want to keep the similar keys in one partition. So, I have converted my dataframe to rdd as you can only apply partitionBy on rdd for re-partitioning.

 myrdd = df.rdd newrdd = myrdd.partitionBy(10,lambda k: int(k[0])) newrdd.take(10) 

I got the same error:

 File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1767, in add_shuffle_key for k, v in iterator: ValueError: too many values to unpack 

Hence, we need to convert our rdd into key-value pair to use paritionBy

keypair_rdd = myrdd.map(lambda x : (x[0],x[1:])) 

Now,you can see that rdd has been converted to key value pair and you can therefore distribute your data in partitions according to keys available.

[(u'1', (u'AAA', u'HR', datetime.date(2018, 12, 6))), (u'1', (u'BBB', u'HR', datetime.date(2017, 10, 26))), (u'2', (u'XXX', u'ADMIN', datetime.date(2018, 10, 22))), (u'2', (u'YYY', u'ADMIN', datetime.date(2015, 10, 19))), (u'2', (u'ZZZ', u'IT', datetime.date(2018, 5, 14))), (u'3', (u'GGG', u'HR', datetime.date(2018, 6, 30)))] 

Using a paritionBy on key-value rdd now:

newrdd = keypair_rdd.partitionBy(5,lambda k: int(k[0])) 

Lets take a look at the partitions. Data is grouped and similar keys are stored into similar partitions now. Two of them are empty.

>>> print("Partitions structure: {}".format(newrdd.glom().map(len).collect())) Partitions structure: [0, 2, 3, 1, 0] 

Now lets say I want to custom partitioning my data. So I have created below function to keep keys '1' and '3' in similar partition.

def partitionFunc(key): import random if key == 1 or key == 3: return 0 else: return random.randint(1,2) newrdd = keypair_rdd.partitionBy(5,lambda k: partitionFunc(int(k[0]))) >>> print("Partitions structure: {}".format(newrdd.glom().map(len).collect())) Partitions structure: [3, 3, 0, 0, 0] 

As you can see now that keys 1 and 3 are stored in one partition and rest on other.

I hope this helps. You can try to partitionBy your dataframe. Make sure to convert it into key value pair and keeping key as type integer.

Sign up to request clarification or add additional context in comments.

1 Comment

Doesn't newrdd = keypair_rdd.partitionBy(5,lambda k: int(k[0])) lead to a type.error as described here? Thus, shouldn't it be newrdd = keypair_rdd.partitionBy(5,lambda k: k)?

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.