1

I want to use flatMap to realize the filter() + map() , like the following code: there are three if statement for outputting one Tuple2. otherwise will output an empty Array[Tuple2]

Do you have more elegant way to realize this function?

 rddx.flatMap { case (arr: Array[String]) => val url_parts = arr(1).split("/") if (url_parts.length > 7) { val pid = url_parts(4) val lid = url_parts(7).split("_") if (lid.length == 2) { val sid = lid(0) val eid = lid(1) if (eid.length > 0 && eid(0) == "h") { Array((pid, 1)) } else new Array[(String, Int)](0) } else Array((pid, 1)) } else new Array[(String, Int)](0) } 
4
  • 1
    Is collect available on an RDD because if so, that should be what you are looking for. The collect function in Scala is equivalent to map + filter. Commented Apr 28, 2015 at 13:34
  • 1
    the collect and filter+map totally different in spark. in spark, collect will read data from all remote workers... so are there other methods? Commented Apr 28, 2015 at 13:37
  • 2
    There are two variations of collect, one that takes no args and one that takes a PartialFunction, I was referring to the later of the two. In looking at the Scala doc, that method seems to do what it would do on a normal scala collection. Are you sure collect is not what you want? Commented Apr 28, 2015 at 13:41
  • yes, we could not use collect function in spark :-) Commented Apr 28, 2015 at 13:43

2 Answers 2

5

You could use a for-comprehension. Granted, this will become a chain of flatMap, map, filter, but Spark will group those in one stage anyway, so there shouldn't be any performance penalty.

for { arr <- rddx url_parts = arr(1).split("/") if url_parts.length > 7 pid = url_parts(4) lid = url_parts(7).split("_") if lid.length == 2 sid = lid(0) eid = lid(1) if eid.length > 0 && eid(0) == "h" } yield Array((pid, 1)) 

Here's the output of toDebugString to show there's only one stage

scala> res.toDebugString res2: String = (8) MapPartitionsRDD[7] at map at <console>:24 [] | MapPartitionsRDD[6] at filter at <console>:24 [] | MapPartitionsRDD[5] at map at <console>:24 [] | MapPartitionsRDD[4] at filter at <console>:24 [] | MapPartitionsRDD[3] at map at <console>:24 [] | MapPartitionsRDD[2] at filter at <console>:24 [] | MapPartitionsRDD[1] at map at <console>:24 [] | ParallelCollectionRDD[0] at parallelize at <console>:21 [] 
Sign up to request clarification or add additional context in comments.

1 Comment

I am sure you can work that out yourself from my answer. If you something is unclear in my solution, I'm happy to explain.
2

"the right tool for the job". In this case, all that parsing could be done using a regex:

val pidCapture = "[\\w]+/[\\w]+/[\\w]+/([\\w]+)/[\\w]+/[\\w]+/[^_]+_h[\\w]+.*".r rdd.map(arr => arr(1)).collect { case pidCapture(pid) => (pid,1) } 

Example on the repl, departing from urls as strings:

val urls = List("one/two/three/pid1/four/five/six/sid_heid", "one/two/three/pid2/four/five/six/sid_noth", "one/two/three/pid3/four/five", "one/two/three/pid4/four/five/six/sid_heid/more") val rdd = sc.parallelize(urls) val regex = "[\\w]+/[\\w]+/[\\w]+/([\\w]+)/[\\w]+/[\\w]+/[^_]+_h[\\w]+.*".r val pids = rdd.collect{ case regex(pid) => (pid,1)} val result = pids.collect() result: Array[(String, Int)] = Array((pid1,1), (pid4,1)) 

2 Comments

my god=。= you are clever:-)
Or better still using scala-parser-combinators

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.