7

I have a bunch of tuples which are in form of composite keys and values. For example,

tfile.collect() = [(('id1','pd1','t1'),5.0), (('id2','pd2','t2'),6.0), (('id1','pd1','t2'),7.5), (('id1','pd1','t3'),8.1) ] 

I want to perform sql like operations on this collection, where I can aggregate the information based on id[1..n] or pd[1..n] . I want to implement using the vanilla pyspark apis and not using SQLContext. In my current implementation I am reading from a bunch of files and merging the RDD.

def readfile(): fr = range(6,23) tfile = sc.union([sc.textFile(basepath+str(f)+".txt") .map(lambda view: set_feature(view,f)) .reduceByKey(lambda a, b: a+b) for f in fr]) return tfile 

I intend to create an aggregated array as a value. For example,

agg_tfile = [((id1,pd1),[5.0,7.5,8.1])] 

where 5.0,7.5,8.1 represent [t1,t2,t3] . I am currently, achieving the same by vanilla python code using dictionaries. It works fine for smaller data sets. But I worry as this may not scale for larger data sets. Is there an efficient way achieving the same using pyspark apis ?

2

2 Answers 2

14
+25

My guess is that you want to transpose the data according to multiple fields.

A simple way is to concatenate the target fields that you will group by, and make it a key in a paired RDD. For example:

lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1']) rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b) print rdd.collect() 

Then you will get the transposed result.

[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')] 
Sign up to request clarification or add additional context in comments.

3 Comments

This is definitely an interesting way of solving this. I figured another way of achieving the same. But I guess, your method might be much faster than mine. I am sharing my own solution as well.
Does PySpark not have groupByKey?
PySpark has the method groupBykey. However, the question tends to group records together based on two fields, instead of doing aggregation such as SELECT sum(value) FROM data GROUP BY id, pd. So groupBykey may not help.
2

I grouped ((id1,t1),((p1,5.0),(p2,6.0)) and so on ... as my map function. Later, I reduce using map_group which creates an array for [p1,p2, . . . ] and fills in values in their respective positions.

def map_group(pgroup): x = np.zeros(19) x[0] = 1 value_list = pgroup[1] for val in value_list: fno = val[0].split('.')[0] x[int(fno)-5] = val[1] return x tgbr = tfile.map(lambda d: ((d[0][0],d[0][2]),[(d[0][1],d[1])])) \ .reduceByKey(lambda p,q:p+q) \ .map(lambda d: (d[0], map_group(d))) 

This does feel like an expensive solution in terms of computation. But works for now.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.