1

I want to join two dataframes based on certain condition is spark scala. However the catch is if row in df1 matches any row in df2, it should not try to match same row of df1 with any other row in df2. Below is sample data and outcome I am trying to get.

 DF1 -------------------------------- Emp_id | Emp_Name | Address_id 1 | ABC | 1 2 | DEF | 2 3 | PQR | 3 4 | XYZ | 1 DF2 ----------------------- Address_id | City 1 | City_1 1 | City_2 2 | City_3 REST | Some_City Output DF ---------------------------------------- Emp_id | Emp_Name | Address_id | City 1 | ABC | 1 | City_1 2 | DEF | 2 | City_3 3 | PQR | 3 | Some_City 4 | XYZ | 1 | City_1 

Note:- REST is like wild card. Any value can be equal to REST.

So in above sample emp_name "ABC" can match with City_1, City_2 or Some_City. Output DF contains only City_1 because it finds it first.

1
  • let question. is correct City_1 with Address_id = 1 and City_2 with Address_id = 1 in DF2? Commented Apr 19, 2020 at 8:19

2 Answers 2

1

You seem to have a custom logic for your join. Basically I've been to come up with the below UDF.

Note that you may want to change the logic for the UDF as per your requirement.

import spark.implicits._ import org.apache.spark.sql.functions.to_timestamp import org.apache.spark.sql.functions.udf import org.apache.spark.sql.functions.first //dataframe 1 val df_1 = Seq(("1", "ABC", "1"), ("2", "DEF", "2"), ("3", "PQR", "3"), ("4", "XYZ", "1")).toDF("Emp_Id", "Emp_Name", "Address_Id") //dataframe 2 val df_2 = Seq(("1", "City_1"), ("1", "City_2"), ("2", "City_3"), ("REST","Some_City")).toDF("Address_Id", "City_Name") // UDF logic val join_udf = udf((a: String, b: String) => { (a,b) match { case ("1", "1") => true case ("1", _) => false case ("2", "2") => true case ("2", _) => false case(_, "REST") => true case(_, _) => false }}) val dataframe_join = df_1.join(df_2, join_udf(df_1("Address_Id"), df_2("Address_Id")), "inner").drop(df_2("Address_Id")) .orderBy($"City_Name") .groupBy($"Emp_Id", $"Emp_Name", $"Address_Id") .agg(first($"City_Name")) .orderBy($"Emp_Id") dataframe_join.show(false) 

Basically post applying UDF, what you get is all possible combinations of the matches.

Post that when you apply groupBy and make use of first function of agg, you would only get the filtered values as what you are looking for.

+------+--------+----------+-----------------------+ |Emp_Id|Emp_Name|Address_Id|first(City_Name, false)| +------+--------+----------+-----------------------+ |1 |ABC |1 |City_1 | |2 |DEF |2 |City_3 | |3 |PQR |3 |Some_City | |4 |XYZ |1 |City_1 | +------+--------+----------+-----------------------+ 

Note that I've made use of Spark 2.3 and hope this helps!

Sign up to request clarification or add additional context in comments.

3 Comments

First apology from my side for late reply. This answer helps me a lot.
@PrateekPathak Not a problem and I'm happy to help. Feel free to upvote the answer as well ;)
Upvoted the answer(not sure if upvoted is correct word or not). Thanks for letting me know about this as I am a new user.
0
{ import org.apache.spark.sql.{SparkSession} import org.apache.spark.sql.functions._ object JoinTwoDataFrame extends App { val spark = SparkSession.builder() .master("local") .appName("DataFrame-example") .getOrCreate() import spark.implicits._ val df1 = Seq( (1, "ABC", "1"), (2, "DEF", "2"), (3, "PQR", "3"), (4, "XYZ", "1") ).toDF("Emp_id", "Emp_Name", "Address_id") val df2 = Seq( ("1", "City_1"), ("1", "City_2"), ("2", "City_3"), ("REST", "Some_City") ).toDF("Address_id", "City") val restCity: Option[String] = Some(df2.filter('Address_id.equalTo("REST")).select('City).first()(0).toString) val res = df1.join(df2, df1.col("Address_id") === df2.col("Address_id") , "left_outer") .select( df1.col("Emp_id"), df1.col("Emp_Name"), df1.col("Address_id"), df2.col("City") ) .withColumn("city2", when('City.isNotNull, 'City).otherwise(restCity.getOrElse(""))) .drop("City") .withColumnRenamed("city2", "City") .orderBy("Address_id", "City") .groupBy("Emp_id", "Emp_Name", "Address_id") .agg(collect_list("City").alias("cityList")) .withColumn("City", 'cityList.getItem(0)) .drop("cityList") .orderBy("Emp_id") res.show(false) // +------+--------+----------+---------+ // |Emp_id|Emp_Name|Address_id|City | // +------+--------+----------+---------+ // |1 |ABC |1 |City_1 | // |2 |DEF |2 |City_3 | // |3 |PQR |3 |Some_City| // |4 |XYZ |1 |City_1 | // +------+--------+----------+---------+ } } 

1 Comment

I am not able to grab the logic of filtering the "REST" record. Please explain.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.