I want to create a list of dates and topics. The topics are in a specific column but there is some extra information attached to it, hence I need to remove some extra information (since the number in the data is not a count value but position where it is in the text).
My idea was to generate a list of (Date, [(Topic, count), (Topic, count) ...])
I generated the following reductions/mappings and used a simplified data set: enter image description here
Since it is a school assignment I cannot use apache sql and Dataframes/set.
val rdd1 = sc.textFile("./somedata.csv") // LOADS DATA FROM CSV FILE val rdd2 = rdd1.map(l => l.split("\t")).filter(x => x.length > 3) // COLUMNS ARE SPLIT BY TABS, NEED TO MAKE SURE I HAVE ONLY ELEMENTS FROM COL 1 and 3 //Array[Array[String]] = Array( //Array(2015, jaap, arie, piet boosboom,10;arie koekwaus,20;moet dat,9;), //Array(2015, sjaak, trekhaak, pieter jaap,20;krijg nou wat,90;), //Array(2016, "", huh, ja ja,10;nee nee,5;ja ja,4;nee nee,3;), //Array(2018, "", wat, huh huh,69;nou moe,70;ja ja, 12;)) val rdd3 = rdd2.map(x => (x(0)+"\t"+x(3))).map(l => l.split("\t")) // NEED TO MAKE SURE I HAVE ONLY ELEMENTS FROM COL 1 and 3 //Array[Array[String]] = Array( //Array(2015, piet boosboom,10; arie koekwaus,20; moet dat,9;), //Array(2015, pieter jaap,20; krijg nou wat,90;), //Array(2016, ja ja,10; nee nee,5;), //Array(2018, huh huh,69; nou moe,70;)) val rdd4 = rdd3.map(x => (x(0), x(1).split(";"))) // REMOVE JUNK AFTER EACH TOPIC // Array[(String, Array[String])] = Array( // (2015,Array(piet boosboom,10, arie koekwaus,20, moet dat,9)), // (2015,Array(pieter jaap,20, krijg nou wat,90)), // (2016,Array(ja ja,10, nee nee,5, ja ja,4, nee nee,3)), // (2018,Array(huh huh,69, nou moe,70, ja ja, 12))) val rdd5 = rdd4.map(x => (x._1, x._2.map(l => l.substring(0,l.indexOf(","))))) // REMOVE JUNK AFTER EACH TOPIC //Array[(String, Array[String])] = Array( //(2015,Array(piet boosboom, arie koekwaus, moet dat)), //(2015,Array(pieter jaap, krijg nou wat)), ( //(2016,Array(ja ja, nee nee, ja ja, nee nee)), //(2018,Array(huh huh, nou moe, ja ja))) val rdd6 = rdd5.map(x => (x._1, x._2.map(l => (l, 1)))) // MAKE KEY VALUE PAIR OF TOPICS // Array[(String, Array[(String, Int)])] = Array( //(2015,Array((piet boosboom,1), (arie koekwaus,1), (moet dat,1))), //(2015,Array((pieter jaap,1), (krijg nou wat,1))), //(2016,Array((ja ja,1), (nee nee,1), (ja ja,1), (nee nee,1))), //(2018,Array((huh huh,1), (nou moe,1), (ja ja,1)))) val rdd7 = rdd6.map(x => (x._1, List(x._2))).reduceByKey(_:::_) //https://stackoverflow.com/questions/32248395/what-is-the-use-of-triple-colons-in-scala?lq=1 // CREATE LIST OF ARRAY OF TOPICS AND CONCATENATE THEM BASED ON DATE // Array[(String, List[Array[(String, Int)]])] = Array( //(2015,List(Array((piet boosboom,1), (arie koekwaus,1), (moet dat,1)), Array((pieter jaap,1), (krijg nou wat,1)))), //(2016,List(Array((ja ja,1), (nee nee,1), (ja ja,1), (nee nee,1)))), //(2018,List(Array((huh huh,1), (nou moe,1), (ja ja,1))))) THE PROBLEM: In rdd7 (last command) I get a List with two arrays, and for some reason I cannot create a mapping that concatenates these two arrays in the list:
In rdd7:
(2015,List(Array((piet boosboom,1), (arie koekwaus,1), (moet dat,1)), Array((pieter jaap,1), (krijg nou wat,1)))), What I want in rdd8:
(2015,List(Array((piet boosboom,1), (arie koekwaus,1), (moet dat,1), (pieter jaap,1), (krijg nou wat,1)))) I tried things like:
val rdd7 = rdd6.map(x => (x._1, List(x._2))).reduceByKey(_:::_).group(_++_) But somehow it will not work