3

Here is my problem: I've got this RDD:

a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']] rdd= sc.parallelize (a) 

Then I try :

rdd.map(lambda x: (x[0],x[1],x[2], list(x[3:]))) .toDF(["col1","col2","col3","col4"]) .groupBy("col1","col2","col3") .agg(collect_list("col4")).show 

Finally I should find this:

[col1,col2,col3,col4]=[u'PNR1',u'TKT1',u'TEST',[[u'a2',u'a3'][u'a5',u'a6'][u'a8',u'a9']]] 

But the problem is that I can't collect a list.

If anyone can help me I will appreciate it

6
  • What do you mean by "I can't collect a list" ? Commented Oct 3, 2017 at 7:35
  • That function collect_list can't receive a list.. I try to collect a list of lists Commented Oct 3, 2017 at 8:14
  • which version of spark are you using ? Commented Oct 3, 2017 at 8:15
  • Spark version 1.6.2 Commented Oct 3, 2017 at 8:50
  • 1
    Can you switch to spark 2+ ? Spark 1.6 uses hive UDAF to perform collect_list which has been re-implemented in spark 2+ to accept lists of list Commented Oct 3, 2017 at 9:10

3 Answers 3

2

I finally found a solution, it is not the best way but I can continue working...

from pyspark.sql.functions import udf from pyspark.sql.functions import * def example(lista): d = [[] for x in range(len(lista))] for index, elem in enumerate(lista): d[index] = elem.split("@") return d example_udf = udf(example, LongType()) a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']] rdd= sc.parallelize (a) df = rdd.toDF(["col1","col2","col3","col4","col5"]) df2=df.withColumn('col6', concat(col('col4'),lit('@'),col('col5'))).drop(col("col4")).drop(col("col5")).groupBy([col("col1"),col("col2"),col("col3")]).agg(collect_set(col("col6")).alias("col6")) df2.map(lambda x: (x[0],x[1],x[2],example(x[3]))).collect() 

And it gives:

[(u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']])] 

Hope this solution can help to someone else.

Thanks for all your answers.

Sign up to request clarification or add additional context in comments.

Comments

1

This might do your job (or give you some ideas to proceed further)...

One idea is to convert your col4 to a primitive data type, i.e. a string:

from pyspark.sql.functions import collect_list import pandas as pd a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']] rdd = sc.parallelize(a) df = rdd.map(lambda x: (x[0],x[1],x[2], '(' + ' '.join(str(e) for e in x[3:]) + ')')).toDF(["col1","col2","col3","col4"]) df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0] #[u'PNR1', u'TKT1', u'TEST', [u'(a2 a3)', u'(a5 a6)', u'(a8 a9)']] 

UPDATE (after your own answer):

I really thought the point I had reached above was enough to further adapt it according to your needs, plus that I didn't have time at the moment to do it myself; so, here it is (after modifying my df definition to get rid of the parentheses, it is just a matter of a single list comprehension):

df = rdd.map(lambda x: (x[0],x[1],x[2], ' '.join(str(e) for e in x[3:]))).toDF(["col1","col2","col3","col4"]) # temp list: ff = df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0] ff # [u'PNR1', u'TKT1', u'TEST', [u'a2 a3', u'a5 a6', u'a8 a9']] # final list of lists: ll = ff[:-1] + [[x.split(' ') for x in ff[-1]]] ll 

which gives your initially requested result:

[u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']]] # requested output 

This approach has certain advantages compared with the one provided in your own answer:

  • It avoids Pyspark UDFs, which are known to be slow
  • All the processing is done in the final (and hopefully much smaller) aggregated data, instead of adding and removing columns and performing map functions and UDFs in the initial (presumably much bigger) data

2 Comments

Actually I need a list of lists in col4, in your answer I've in string type (a2 a3) for example, and I need [[a2,a3],[a5,a6],[a8,a9]]
@CarlosLopezSobrino isn't the updated answer exactly what you asked for?
0

Since you cannot update to 2.x your only option is RDD API. Replace you current code with:

rdd.map(lambda x: ((x[0], x[1], x[2]), list(x[3:]))).groupByKey().toDF() 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.