21

There are two DataFrames (Scala, Apache Spark 1.6.1)

1) Matches

 MatchID | Player1 | Player2 -------------------------------- 1 | John Wayne | John Doe 2 | Ive Fish | San Simon 

2) Personal Data

 Player | BirthYear -------------------------------- John Wayne | 1986 Ive Fish | 1990 San Simon | 1974 john Doe | 1995 

How could create a new DataFrame with 'BirthYear' for the both players

 MatchID | Player1 | Player2 | BYear_P1 |BYear_P2 | Diff ------------------------------------------------------------- 1 | John Wayne | John Doe | 1986 | 1995 | 9 2 | Ive Fish | San Simon | 1990 | 1974 | 16 

?

I tried

 val df = MatchesDF.join(PersonalDF, MatchesDF("Player1") === PersonalDF("Player")) 

then join again for the second player

 val resDf = df.join(PersonalDF, df("Player2") === PersonalDF("Player")) 

but it's VERY time consuming operation.

May be another way to do it in Scala and Apache Spark?

4 Answers 4

26

This is a solution using spark's dataframe functions:

import sqlContext.implicits._ import org.apache.spark.sql.Row import org.apache.spark.sql.functions.abs val matches = sqlContext.sparkContext.parallelize(Row(1, "John Wayne", "John Doe"), Row(2, "Ive Fish", "San Simon"))) val players = sqlContext.sparkContext.parallelize(Seq( Row("John Wayne", 1986), Row("Ive Fish", 1990), Row("San Simon", 1974), Row("John Doe", 1995) )) val matchesDf = sqlContext.createDataFrame(matches, StructType(Seq( StructField("matchId", IntegerType, nullable = false), StructField("player1", StringType, nullable = false), StructField("player2", StringType, nullable = false))) ).as('matches) val playersDf = sqlContext.createDataFrame(players, StructType(Seq( StructField("player", StringType, nullable = false), StructField("birthYear", IntegerType, nullable = false) ))).as('players) matchesDf .join(playersDf, $"matches.player1" === $"players.player") .select($"matches.matchId" as "matchId", $"matches.player1" as "player1", $"matches.player2" as "player2", $"players.birthYear" as "player1BirthYear") .join(playersDf, $"player2" === $"players.player") .select($"matchId" as "MatchID", $"player1" as "Player1", $"player2" as "Player2", $"player1BirthYear" as "BYear_P1", $"players.birthYear" as "BYear_P2") .withColumn("Diff", abs('BYear_P2.minus('BYear_P1))) .show() +-------+----------+---------+--------+--------+----+ |MatchID| Player1| Player2|BYear_P1|BYear_P2|Diff| +-------+----------+---------+--------+--------+----+ | 1|John Wayne| John Doe| 1986| 1995| 9| | 2| Ive Fish|San Simon| 1990| 1974| 16| +-------+----------+---------+--------+--------+----+ 
Sign up to request clarification or add additional context in comments.

1 Comment

I like that you completely prevented SQL statements! +1
13

This should perform better:

case class Match(matchId: Int, player1: String, player2: String) case class Player(name: String, birthYear: Int) val matches = Seq( Match(1, "John Wayne", "John Doe"), Match(2, "Ive Fish", "San Simon") ) val players = Seq( Player("John Wayne", 1986), Player("Ive Fish", 1990), Player("San Simon", 1974), Player("John Doe", 1995) ) val matchesDf = sqlContext.createDataFrame(matches) val playersDf = sqlContext.createDataFrame(players) matchesDf.registerTempTable("matches") playersDf.registerTempTable("players") sqlContext.sql( "select matchId, player1, player2, p1.birthYear, p2.birthYear, abs(p1.birthYear-p2.birthYear) " + "from matches m inner join players p1 inner join players p2 " + "where m.player1 = p1.name and m.player2 = p2.name").show() +-------+----------+---------+---------+---------+---+ |matchId| player1| player2|birthYear|birthYear|_c5| +-------+----------+---------+---------+---------+---+ | 1|John Wayne| John Doe| 1986| 1995| 9| | 2| Ive Fish|San Simon| 1990| 1974| 16| +-------+----------+---------+---------+---------+---+ 

I didn't find the way to express join of 3 tables in Scala DSL.

5 Comments

again doing two joins, how does it make it better?
This run about 2 minutes for Matches table with ~10000 rows and Player table with ~700 records
use data frames for your joins instead of plain sql for better performance.
@dheee Facing similar issue : stackoverflow.com/questions/38149483/… Any idea ?
if you have more than 2 DataFrame you want to do join on. Would it be faster to do it in one query or several join / select ?
4

In Spark 2.0 and above, Spark provides several syntaxes to join two dataframes

join(right: Dataset[_]): DataFrame join(right: Dataset[_], usingColumn: String): DataFrame join(right: Dataset[_], usingColumns: Seq[String]): DataFrame join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame join(right: Dataset[_], joinExprs: Column): DataFrame join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame 

All these Spark Join methods available in the Dataset class and these methods return DataFrame (note DataFrame = Dataset[Row])

All these methods take first arguments as a Dataset[_] meaning it also takes DataFrame.

To explain how to join, I will take emp and dept DataFrame

empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner") .show(false) 

If you have to join column names the same on both dataframes, you can even ignore join expression.

Comments

2
val df = left.join(right, Seq("name")) display(df) 

4 Comments

Hello and welcome to StackOverflow. Please add some explanation to your answer so it becomes more valuable for other users. See stackoverflow.com/help/how-to-answer
This information is insufficient to provide any sort of help. What is "left"? What is "right"? Please rephrase your answer..
There's no such function as display in Spark Dataframe (Scala implementation)
I do like the syntax of using Seq("column_name") as the join condition, its what my colleagues all use and its readable. For explanation: I think left is one dataframe and right is one dataframe - so this is not a left-join or right-join situation.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.