0

I apologise for what will probably be a simple question but I'm struggling to get to grips with parsing rdd's with scala/spark. I have an RDD created from a CSV, read in with

 val partitions: RDD[(String, String, String, String, String)] = withoutHeader.mapPartitions(lines => { val parser = new CSVParser(',') lines.map(line => { val columns = parser.parseLine(line) (columns(0), columns(1), columns(2), columns(3), columns(4)) }) }) 

When I output this to a file with

partitions.saveAsTextFile(file) 

I get the output with the parentheses on each line. I don't want these parentheses. I'm struggling in general to understand what is happening here. My background is with low level languages and I'm struggling to see through the abstractions to what it's actually doing. I understand the mappings but it's the output that is escaping me. Can someone either explain to me what is going on in the line (columns(0), columns(1), columns(2), columns(3), columns(4)) or point me to a guide that simply explains what is happening?

My ultimate goal is to be able to manipulate files that are on hdsf in spark to put them in formats suitable for mllib.I'm unimpressed with the spark or scala guides as they look like they have been produced with poorly annotated javadocs and don't really explain anything.

Thanks in advance.

Dean

2
  • Could you please show a sample of your output file? Commented Mar 26, 2015 at 14:13
  • Output file shown below: (3.5,1.4,0.2,setosa) (3,1.4,0.2,setosa) (3.2,1.3,0.2,setosa) (3.1,1.5,0.2,setosa) (3.6,1.4,0.2,setosa) Commented Mar 26, 2015 at 14:58

1 Answer 1

3

I would just convert your tuple to the string format you want. For example, to create |-delimited output:

partitions.map{ tup => s"${tup._1}|${tup._2}|${tup._3}|${tup._4}|${tup._5}" }

or using pattern matching (which incurs a little more runtime overhead):

partitions.map{ case (a,b,c,d,e) => s"$a|$b|$c|$d|$e" }

I'm using the string interpolation feature of Scala (note the s"..." format).

Side note, you can simplify your example by just mapping over the RDD as a whole, rather than the individual partitions:

val parser = new CSVParser(',') val partitions: RDD[(String, String, String, String, String)] = withoutHeader.map { line => val columns = parser.parseLine(line) (columns(0), columns(1), columns(2), columns(3), columns(4)) } 
Sign up to request clarification or add additional context in comments.

4 Comments

If I was mapping over the RDD as a whole, does that have performance implications if I'm working with a huge file on a distributed system via a yarn job? Working on the individual partitions feels like it should be more efficient with yarn although that is just based on how I think the parallelisation might work.
+1 I went too complex without thinking simple first :). As to the above question. It depends on how expensive the CSVParser is really. The biggest reason for using mapPartitions is to cut down on instantiation. If it is a simple parser, then creating it once and sending it over the wire should be fine. All you are doing is micro-managing where it seems unnecessary.
Agreed. RDD.map actually uses one of the mapPartition variants, so unless you need the finer-grained control, there isn't much benefit in calling it explicitly.
Perfect. Thanks both for your input. Justin, your solution may have been more complex but I appreciated the explanation of what was going on in the background. Really useful.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.