8

I have a dataset, which contains lines in the format (tab separated):

Title<\t>Text 

Now for every word in Text, I want to create a (Word,Title) pair. For instance:

ABC Hello World 

gives me

(Hello, ABC) (World, ABC) 

Using Scala, I wrote the following:

val file = sc.textFile("s3n://file.txt") val title = file.map(line => line.split("\t")(0)) val wordtitle = file.map(line => (line.split("\t")(1).split(" ").map(word => (word, line.split("\t")(0))))) 

But this gives me the following output:

[Lscala.Tuple2;@2204b589 [Lscala.Tuple2;@632a46d1 [Lscala.Tuple2;@6c8f7633 [Lscala.Tuple2;@3e9945f3 [Lscala.Tuple2;@40bf74a0 [Lscala.Tuple2;@5981d595 [Lscala.Tuple2;@5aed571b [Lscala.Tuple2;@13f1dc40 [Lscala.Tuple2;@6bb2f7fa [Lscala.Tuple2;@32b67553 [Lscala.Tuple2;@68d0b627 [Lscala.Tuple2;@8493285 

How do I solve this?

Further reading

What I want to achieve is to count the number of Words that occur in a Text for a particular Title.

The subsequent code that I have written is:

val wordcountperfile = file.map(line => (line.split("\t")(1).split(" ").flatMap(word => word), line.split("\t")(0))).map(word => (word, 1)).reduceByKey(_ + _) 

But it does not work. Please feel free to give your inputs on this. Thanks!

0

4 Answers 4

13

So... In spark you work using distributed data structure called RDD. They provide functionality similar to scala's collection types.

val fileRdd = sc.textFile("s3n://file.txt") // RDD[ String ] val splitRdd = fileRdd.map( line => line.split("\t") ) // RDD[ Array[ String ] val yourRdd = splitRdd.flatMap( arr => { val title = arr( 0 ) val text = arr( 1 ) val words = text.split( " " ) words.map( word => ( word, title ) ) } ) // RDD[ ( String, String ) ] // Now, if you want to print this... yourRdd.foreach( { case ( word, title ) => println( s"{ $word, $title }" ) } ) // if you want to count ( this count is for non-unique words), val countRdd = yourRdd .groupBy( { case ( word, title ) => title } ) // group by title .map( { case ( title, iter ) => ( title, iter.size ) } ) // count for every title 
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for the quick post. However, it gives me an error or seq.length. Basically, I want to store the ((Word,Title), COUNT) in a file.
I'm using this to count the words: val countRdd = yourRdd.map(title => (title, 1)).reduceByKey(_ + _) Can you confirm it's correct.
Yes... because Iterable does not have length... they have size. Changing the answer.
Yes... your reduceByKey approach is also Ok. I am giving you number of words per title. So... ( title, count ).
3

This is how it can be solved using the newer dataframe API. First read the data using "\t" as a delimiter:

val df = spark.read .option("delimiter", "\t") .option("header", false) .csv("s3n://file.txt") .toDF("title", "text") 

Then, split the text column on space and explode to get one word per row.

val df2 = df.select($"title", explode(split($"text", " ")).as("words")) 

Finally, group on the title column and count the number of words for each.

val countDf = df2.groupBy($"title").agg(count($"words")) 

Comments

0

Another version with DataFrame API

// read into DataFrame val viewsDF=spark.read.text("s3n://file.txt") // Split val splitedViewsDF = viewsDF.withColumn("col1", split($"value", "\\t").getItem(0)).withColumn("col2", split($"value", "\\s+").getItem(1)).drop($"value")) 

Sample

scala> val viewsDF=spark.read.text("spark-labs/data/wiki-pageviews.txt") viewsDF: org.apache.spark.sql.DataFrame = [value: string] scala> viewsDF.printSchema root |-- value: string (nullable = true) scala> viewsDF.limit(5).show +------------------+ | value| +------------------+ | aa Main_Page 3 0| | aa Main_page 1 0| | aa User:Savh 1 0| | aa Wikipedia 1 0| |aa.b User:Savh 1 0| +------------------+ scala> val splitedViewsDF = viewsDF.withColumn("col1", split($"value", "\\s+").getItem(0)).withColumn("col2", split($"value", "\\s+").getItem(1)).withColumn("col3", split($"value", "\\s+").getItem(2)).drop($"value") splitedViewsDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field] scala> scala> splitedViewsDF.printSchema root |-- col1: string (nullable = true) |-- col2: string (nullable = true) |-- col3: string (nullable = true) scala> splitedViewsDF.limit(5).show +----+---------+----+ |col1| col2|col3| +----+---------+----+ | aa|Main_Page| 3| | aa|Main_page| 1| | aa|User:Savh| 1| | aa|Wikipedia| 1| |aa.b|User:Savh| 1| +----+---------+----+ scala> 

Comments

0

The answer which proved above is not good enough. .map( line => line.split("\t") ) may cause:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 1485, ip-172-31-113-181.us-west-2.compute.internal, executor 10): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14

in case the last column is empty. the best result explained here - Split 1 column into 3 columns in spark scala

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.