3

The use-case is to self-join a table multiple times.

// Hive Table val network_file = spark.sqlContext.sql("SELECT * FROM test.network_file") // Cache network_file.cache() network_file.createOrReplaceTempView("network_design") 

Now the following query does self-join multiple times.

val res = spark.sqlContext.sql("""select one.sourcehub as source, one.mappedhub as first_leg, two.mappedhub as second_leg, one.destinationhub as dest from (select * from network_design) one JOIN (select * from network_design) two JOIN (select * from network_design) three ON (two.sourcehub = one.mappedhub ) AND (three.sourcehub = two.mappedhub) AND (one.destinationhub = two.destinationhub ) AND (two.destinationhub = three.destinationhub) group by source, first_leg, second_leg, dest """) 

Problem is that the Physical Plan of above query suggests on reading the table three times.

== Physical Plan == *HashAggregate(keys=[sourcehub#83, mappedhub#85, mappedhub#109, destinationhub#84], functions=[]) +- Exchange hashpartitioning(sourcehub#83, mappedhub#85, mappedhub#109, destinationhub#84, 200) +- *HashAggregate(keys=[sourcehub#83, mappedhub#85, mappedhub#109, destinationhub#84], functions=[]) +- *Project [sourcehub#83, destinationhub#84, mappedhub#85, mappedhub#109] +- *BroadcastHashJoin [mappedhub#109, destinationhub#108], [sourcehub#110, destinationhub#111], Inner, BuildRight :- *Project [sourcehub#83, destinationhub#84, mappedhub#85, destinationhub#108, mappedhub#109] : +- *BroadcastHashJoin [mappedhub#85, destinationhub#84], [sourcehub#107, destinationhub#108], Inner, BuildRight : :- *Filter (isnotnull(destinationhub#84) && isnotnull(mappedhub#85)) : : +- InMemoryTableScan [sourcehub#83, destinationhub#84, mappedhub#85], [isnotnull(destinationhub#84), isnotnull(mappedhub#85)] : : +- InMemoryRelation [sourcehub#83, destinationhub#84, mappedhub#85], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : : +- HiveTableScan [sourcehub#0, destinationhub#1, mappedhub#2], HiveTableRelation `test`.`network_file`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [sourcehub#0, destinationhub#1, mappedhub#2] : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false], input[1, string, false])) : +- *Filter ((isnotnull(sourcehub#107) && isnotnull(destinationhub#108)) && isnotnull(mappedhub#109)) : +- InMemoryTableScan [sourcehub#107, destinationhub#108, mappedhub#109], [isnotnull(sourcehub#107), isnotnull(destinationhub#108), isnotnull(mappedhub#109)] : +- InMemoryRelation [sourcehub#107, destinationhub#108, mappedhub#109], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- HiveTableScan [sourcehub#0, destinationhub#1, mappedhub#2], HiveTableRelation `test`.`network_file`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [sourcehub#0, destinationhub#1, mappedhub#2] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false], input[1, string, false])) +- *Filter (isnotnull(sourcehub#110) && isnotnull(destinationhub#111)) +- InMemoryTableScan [sourcehub#110, destinationhub#111], [isnotnull(sourcehub#110), isnotnull(destinationhub#111)] +- InMemoryRelation [sourcehub#110, destinationhub#111, mappedhub#112], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- HiveTableScan [sourcehub#0, destinationhub#1, mappedhub#2], HiveTableRelation `test`.`network_file`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [sourcehub#0, destinationhub#1, mappedhub#2] 

Shouldn't the Spark cache the table once and not read it multiple times? How can we efficiently cache tables in spark for these self-join cases?

Spark Version - 2.2 Hive ORC is the store downstream.

3
  • Why are you doing spark.sqlContext.sql, you can use spark.sql Commented Apr 18, 2018 at 20:38
  • I think the physical plan is what is executed at the end. jaceklaskowski.gitbooks.io/mastering-spark-sql/… Here is the complete plan. pastebin.com/muPamdNA Commented Apr 19, 2018 at 9:11
  • Agreed you were correct. pardon me, just checked the spark documentation as well. Let me delete that comment so that people would not be misled. Commented Apr 19, 2018 at 16:55

1 Answer 1

1

This sequence of statements ignores the data frame that is to be cached:

network_file.cache() #the result of this is not being used at all network_file.createOrReplaceTempView("network_design") #doesn't have the cached DF in lineage 

You should either overwrite the variable or register the table on the returned data frame:

network_file = network_file.cache() network_file.createOrReplaceTempView("network_design") 

Or:

network_file.cache().createOrReplaceTempView("network_design") 
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks, I tried your suggestion and I think the data frame is getting cached now. But why is that the Physical Plan has no indication of the table being read from the cached source? I couldn't find a difference in the physical plan when we are caching the table.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.