I read that RDDs with the same partitioner will be co-located. This is important to me because I want to join several large Hive tables that are not partitioned. My theory is that if I can get them partitioned (by a field call date_day) and co-located then I would avoid shuffling .
Here is what I am trying to do for each table:
def date_day_partitioner(key): return (key.date_day - datetime.date(2017,05,01)).days df = sqlContext.sql("select * from hive.table") rdd = df.rdd rdd2 = rdd.partitionBy(100, date_day_partitioner) df2 = sqlContext.createDataFrame(rdd2, df_log_entry.schema) print df2.count() Unfortunately, I can't even test my theory about co-location and avoiding shuffling, because I get the following error when I try partitionBy: ValueError: too many values to unpack
Traceback (most recent call last): File "/tmp/zeppelin_pyspark-118755547579363441.py", line 346, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-118755547579363441.py", line 339, in <module> exec(code) File "<stdin>", line 15, in <module> File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 380, in count return int(self._jdf.count()) File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o115.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 6.0 failed 4 times, most recent failure: Lost task 21.3 in stage 6.0 (TID 182, ip-172-31-49-209.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/worker.py", line 174, in main process() File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/lib/spark/python/pyspark/rdd.py", line 1752, in add_shuffle_key ValueError: too many values to unpack at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) ... I must be doing something wrong, could you please help?