1

I am new in Apache Spark and need some help. Can someone say how correctly to join next 2 dataframes?!

First dataframe:

| DATE_TIME | PHONE_NUMBER | |---------------------|--------------| | 2019-01-01 00:00:00 | 7056589658 | | 2019-02-02 00:00:00 | 7778965896 | 

Second dataframe:

| DATE_TIME | IP | |---------------------|---------------| | 2019-01-01 01:00:00 | 194.67.45.126 | | 2019-02-02 00:00:00 | 102.85.62.100 | | 2019-03-03 03:00:00 | 102.85.62.100 | 

Final dataframe which I want:

| DATE_TIME | PHONE_NUMBER | IP | |---------------------|--------------|---------------| | 2019-01-01 00:00:00 | 7056589658 | | | 2019-01-01 01:00:00 | | 194.67.45.126 | | 2019-02-02 00:00:00 | 7778965896 | 102.85.62.100 | | 2019-03-03 03:00:00 | | 102.85.62.100 | 

Here below the code which I tried:

import org.apache.spark.sql.Dataset import spark.implicits._ val df1 = Seq( ("2019-01-01 00:00:00", "7056589658"), ("2019-02-02 00:00:00", "7778965896") ).toDF("DATE_TIME", "PHONE_NUMBER") df1.show() val df2 = Seq( ("2019-01-01 01:00:00", "194.67.45.126"), ("2019-02-02 00:00:00", "102.85.62.100"), ("2019-03-03 03:00:00", "102.85.62.100") ).toDF("DATE_TIME", "IP") df2.show() val total = df1.join(df2, Seq("DATE_TIME"), "left_outer") total.show() 

Unfortunately, it raise error:

org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136) at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:135) ... 
1
  • 1
    It seems correct to me except the join type needs to be full to get the desired result. Maybe the issue is in the configuration, could you post the full stack trace? Commented May 2, 2019 at 5:58

2 Answers 2

3

You need to full outer join, but your code is good. Your issue might be some thing else, but with the stack trace you mentioned can't conclude what the issue is.

val total = df1.join(df2, Seq("DATE_TIME"), "full_outer") 
Sign up to request clarification or add additional context in comments.

1 Comment

I tested code in Zeppelin. I found the error reason. In my case I selected incorrect interpreter binding at start. full_outer exactly what I need. Thank you for your answer.
1

You can do this:

val total = df1.join(df2, (df1("DATE_TIME") === df2("DATE_TIME")), "left_outer") 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.