I'm pretty new to Apache Spark and I'm trying to repartition a dataframe by U.S. State. I then want to break each partition into its own RDD and save to a specific location:
schema = types.StructType([ types.StructField("details", types.StructType([ types.StructField("state", types.StringType(), True) ]), True) ]) raw_rdd = spark_context.parallelize([ '{"details": {"state": "AL"}}', '{"details": {"state": "AK"}}', '{"details": {"state": "AZ"}}', '{"details": {"state": "AR"}}', '{"details": {"state": "CA"}}', '{"details": {"state": "CO"}}', '{"details": {"state": "CT"}}', '{"details": {"state": "DE"}}', '{"details": {"state": "FL"}}', '{"details": {"state": "GA"}}' ]).map( lambda row: json.loads(row) ) rdd = sql_context.createDataFrame(raw_rdd).repartition(10, "details.state").rdd for index in range(0, rdd.getNumPartitions()): partition = rdd.mapPartitionsWithIndex( lambda partition_index, partition: partition if partition_index == index else [] ).coalesce(1) if partition.count() > 0: df = sql_context.createDataFrame(partition, schema=schema) for event in df.collect(): print "Partition {0}: {1}".format(index, str(event)) else: print "Partition {0}: No rows".format(index) In order to test, I load a file from S3 with 50 rows (10 in the example), each with a different state in the details.state column. In order to mimic the behavior I've parallelized data in the example above, but the behavior is the same. I get the 50 partitions I asked for but some aren't being used and some carry entries for more than one state. Here's the output for the sample set of 10:
Partition 0: Row(details=Row(state=u'AK')) Partition 1: Row(details=Row(state=u'AL')) Partition 1: Row(details=Row(state=u'CT')) Partition 2: Row(details=Row(state=u'CA')) Partition 3: No rows Partition 4: No rows Partition 5: Row(details=Row(state=u'AZ')) Partition 6: Row(details=Row(state=u'CO')) Partition 6: Row(details=Row(state=u'FL')) Partition 6: Row(details=Row(state=u'GA')) Partition 7: Row(details=Row(state=u'AR')) Partition 7: Row(details=Row(state=u'DE')) Partition 8: No rows Partition 9: No rows My question: is the repartitioning strategy just a suggestion to Spark or is there something fundamentally wrong with my code?