-1

I have two datasets that I want to INNER JOIN to give me a whole new table with the desired data. I used SQL and manage to get it. But now I want to try it with map() and filter(), is it possible?

This is my code using the SPARK SQL:

 import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object hello { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName("quest9") val sc = new SparkContext(conf) val spark = SparkSession.builder().appName("quest9").master("local").getOrCreate() val zip_codes = spark.read.format("csv").option("header", "true").load("/home/hdfs/Documents/quest_9/doc/zip.csv") val census = spark.read.format("csv").option("header", "true").load("/home/hdfs/Documents/quest_9/doc/census.csv") census.createOrReplaceTempView("census") zip_codes.createOrReplaceTempView("zip") //val query = spark.sql("SELECT * FROM census") val query = spark.sql("SELECT DISTINCT census.Total_Males AS male, census.Total_Females AS female FROM census INNER JOIN zip ON census.Zip_Code=zip.Zip_Code WHERE zip.City = 'Inglewood' AND zip.County = 'Los Angeles'") query.show() query.write.parquet("/home/hdfs/Documents/population/census/IDE/census.parquet") sc.stop() } } 
2

2 Answers 2

1

The only sensible way, in general to do this would be to use the join() method of `Dataset̀. I would urge you to question the need to use only map/filter to do this, as this is not intuitive, and will probably confuse any experienced spark developer (or simply put, make him roll his eyes). It may also lead to scalability issues should the dataset grow.

That said, in your use case, it is pretty simple to avoid using join. Another possibility would be to issue two separate jobs to spark :

  1. fetch the zip code(s) that interests you
  2. filter on the census data on that (those) zip code(s)

Step 1 collect the zip codes of interest (not sure of the exact syntax as I do not have a spark shell at hand, but it should be trivial to find the right one).

var codes: Seq[String] = zip_codes // filter on the city .filter(row => row.getAs[String]("City").equals("Inglewood")) // filter on the county .filter(row => row.getAs[String]("County").equals("Los Angeles")) // map to zip code as a String .map(row => row.getAs[String]("Zip_Code")) .as[String] // Collect on the driver side .collect() 

Then again, writing it this way instead of using select/where is pretty strange to anyone being used to spark.

Yet, the reason this will work is because we can be sure that zip codes matching a given town and county will be really small. So it is safe to perform driver side collcetion of the result.

Now on to step 2 :

census.filter(row => codes.contains(row.getAs[String]("Zip_Code"))) .map( /* whatever to get your data out */ ) 
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you for the help, since I'm in "formation" I bet its just to increase my knowledge. Anyway, you helped me a lot.
just didnt quite get the last .map() in step two. I did a .collect() instead and it gets what I want plus the columns I dont want
0

What you need is a join, your query roughly translates to :

census.as("census") .join( broadcast(zip_codes .where($"City"==="Inglewood") .where($"County"==="Los Angeles") .as("zip")) ,Seq("Zip_Code"), "inner" // "leftsemi" would also be sufficient ) .select( $"census.Total_Males".as("male"), $"census.Total_Females".as("female") ).distinct() 

1 Comment

Appreciate the help !

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.