2

I have an RDD with this structure

 RDD[((String, String), List[(Int, Timestamp, String)])] 

and data

 ((D2,Saad Arif),List((4,2011-10-05 00:00:00.0,C101), (5,2010-01-27 00:00:00.0,C101))) ((D3,Faran Abid),List((7,2016-10-05 00:00:00.0,C101))) ((D1,Atif Shahzad),List((1,2012-04-15 00:00:00.0,C101), (2,2011-10-05 00:00:00.0,C101), (3,2006-12-25 00:00:00.0,C101))) 

consider this as table means

 '(D2,Saad Arif)' 

is like key and

 'List((4,2011-10-05 00:00:00.0,C101), (5,2010-01-27 00:00:00.0,C101)' 

is like rows for this key. Now i want to check for each row that if there is record(history) with code 'C101' before two or more year then set level to 2 otherwise to 1. So the resulting RDD should look like this

((D2,Saad Arif),List((4,2011-10-05 00:00:00.0,C101, 1), (5,2010-01-27 00:00:00.0,C101, 1))) ((D3,Faran Abid),List((7,2016-10-05 00:00:00.0,C101, 1))) ((D1,Atif Shahzad),List((1,2012-04-15 00:00:00.0,C101, 2), (2,2011-10-05 00:00:00.0,C101, 2), (3,2006-12-25 00:00:00.0,C101, 1))) 

Notice new level after timestamp.How can i do this with map or flatmap?

11
  • Do you understand the difference between map and flatMap? This one is clearly an use-case for map. Commented Aug 30, 2016 at 10:39
  • Also... Please look at your past questions. And if someone has correctly answered your question, make sure to appreciate that person's effort by marking his answer as accepted. Commented Aug 30, 2016 at 10:41
  • @Sarvesh Kumar Singh yes i have basic idea of map and flat map but i don't know how to use in this scenario. Commented Aug 30, 2016 at 10:46
  • @Sarvesh Kumar Singh I have marked answers accepted. Commented Aug 30, 2016 at 10:49
  • stackoverflow.com/a/39124475/1151929 and stackoverflow.com/a/39013506/1151929 Commented Aug 30, 2016 at 10:58

1 Answer 1

1
import java.time.LocalDate import java.time.format.DateTimeFormatter import java.time.Period val df1 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S") val futureDate = LocalDate.parse("2100-01-01 00:00:00.0", df1) val yourRequiredRdd = yourRdd .map({ case (t, list) => { val list1 = list.map({ case (id, dateStr, id2) => (id, LocalDate.parse(dateStr, df1), id2) }) val oldestDate = list1 .filter({ case (id, date, id2) => id2.equals("C101") }) .map(_._2) .foldLeft(futureDate)((oldestDate, date) => { val period = Period.between(oldestDate, date) if (!period.isNegative()) oldestDate else date }) val newList = list1 .map({ case (id, date, "C101") => { val periodFromOldestDate = Period.between(oldestDate, date) val extraNumber = if (periodFromOldestDate.getYears() >= 2) 2 else 1 (id, date, "C101", extraNumber) } case (id, date, id2) => { (id, date, id2, 1) } }) (t, newList) } }) .flatMap({ case ((pid, name), list) => list.map({ case (id, date, code, level) => (id, name, code, pid, date, level) }) }) 
Sign up to request clarification or add additional context in comments.

5 Comments

thank you very much. Can you briefly explain the code to find 'oldestDate ' .
filter out the entries others than C101 then map to just keep dates. Now you just have a list of dates. Now fold on that list to find the oldest date.
requiredRDD have this structure 'RDD[((String, String), List[(Int, LocalDate, String, Int)])]' how i map it to 'RDD[(Int, String, String, String, LocalDate, Int)]' by something using map { case ((pid, name), (id, date, code, level)) => (id, name, code, pid, date, level) }
Added to the answer.
thanx i needed that

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.