The use-case is to self-join a table multiple times.
// Hive Table val network_file = spark.sqlContext.sql("SELECT * FROM test.network_file") // Cache network_file.cache() network_file.createOrReplaceTempView("network_design") Now the following query does self-join multiple times.
val res = spark.sqlContext.sql("""select one.sourcehub as source, one.mappedhub as first_leg, two.mappedhub as second_leg, one.destinationhub as dest from (select * from network_design) one JOIN (select * from network_design) two JOIN (select * from network_design) three ON (two.sourcehub = one.mappedhub ) AND (three.sourcehub = two.mappedhub) AND (one.destinationhub = two.destinationhub ) AND (two.destinationhub = three.destinationhub) group by source, first_leg, second_leg, dest """) Problem is that the Physical Plan of above query suggests on reading the table three times.
== Physical Plan == *HashAggregate(keys=[sourcehub#83, mappedhub#85, mappedhub#109, destinationhub#84], functions=[]) +- Exchange hashpartitioning(sourcehub#83, mappedhub#85, mappedhub#109, destinationhub#84, 200) +- *HashAggregate(keys=[sourcehub#83, mappedhub#85, mappedhub#109, destinationhub#84], functions=[]) +- *Project [sourcehub#83, destinationhub#84, mappedhub#85, mappedhub#109] +- *BroadcastHashJoin [mappedhub#109, destinationhub#108], [sourcehub#110, destinationhub#111], Inner, BuildRight :- *Project [sourcehub#83, destinationhub#84, mappedhub#85, destinationhub#108, mappedhub#109] : +- *BroadcastHashJoin [mappedhub#85, destinationhub#84], [sourcehub#107, destinationhub#108], Inner, BuildRight : :- *Filter (isnotnull(destinationhub#84) && isnotnull(mappedhub#85)) : : +- InMemoryTableScan [sourcehub#83, destinationhub#84, mappedhub#85], [isnotnull(destinationhub#84), isnotnull(mappedhub#85)] : : +- InMemoryRelation [sourcehub#83, destinationhub#84, mappedhub#85], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : : +- HiveTableScan [sourcehub#0, destinationhub#1, mappedhub#2], HiveTableRelation `test`.`network_file`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [sourcehub#0, destinationhub#1, mappedhub#2] : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false], input[1, string, false])) : +- *Filter ((isnotnull(sourcehub#107) && isnotnull(destinationhub#108)) && isnotnull(mappedhub#109)) : +- InMemoryTableScan [sourcehub#107, destinationhub#108, mappedhub#109], [isnotnull(sourcehub#107), isnotnull(destinationhub#108), isnotnull(mappedhub#109)] : +- InMemoryRelation [sourcehub#107, destinationhub#108, mappedhub#109], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- HiveTableScan [sourcehub#0, destinationhub#1, mappedhub#2], HiveTableRelation `test`.`network_file`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [sourcehub#0, destinationhub#1, mappedhub#2] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false], input[1, string, false])) +- *Filter (isnotnull(sourcehub#110) && isnotnull(destinationhub#111)) +- InMemoryTableScan [sourcehub#110, destinationhub#111], [isnotnull(sourcehub#110), isnotnull(destinationhub#111)] +- InMemoryRelation [sourcehub#110, destinationhub#111, mappedhub#112], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- HiveTableScan [sourcehub#0, destinationhub#1, mappedhub#2], HiveTableRelation `test`.`network_file`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [sourcehub#0, destinationhub#1, mappedhub#2] Shouldn't the Spark cache the table once and not read it multiple times? How can we efficiently cache tables in spark for these self-join cases?
Spark Version - 2.2 Hive ORC is the store downstream.