The Evolution of a Relational Database Layer over HBase @ApachePhoenix http://phoenix.apache.org/ James Taylor (@JamesPlusPlus) V5
About James • Architect at Salesforce.com – Part of the Big Data group • Lead of Apache Phoenix project • PMC member of Apache Calcite • Engineer and Product Manager at BEA Systems – XQuery-based federated query engine – SQL-based complex event processing engine • Various startups prior to that
Agenda • What is Apache Phoenix? • State of the Union • A Deeper Look – Joins and Subquery Support • What’s New? • What’s Next? • Q&A
What is Apache Phoenix? • A relational database layer for Apache HBase – Query engine • Transforms SQL queries into native HBase API calls • Pushes as much work as possible onto the cluster for parallel execution – Metadata repository • Typed access to data stored in HBase tables – A JDBC driver • A top level Apache Software Foundation project – Originally developed at Salesforce – Now a top-level project at the ASF (Happy Birthday!) – A growing community with momentum
Where Does Phoenix Fit In? Sqoop RDBDataCollector Flume LogDataCollector Zookeeper Coordination YARN (MRv2) Cluster Resource Manager / MapReduce HDFS 2.0 Hadoop Distributed File System GraphX Graph analysis framework Phoenix Query execution engine HBase Distributed Database The Java Virtual Machine Hadoop Common JNI Spark Iterative In-Memory Computation MLLib Data mining Pig Data Manipulation Hive Structured Query Phoenix JDBC client
State of the Union • Broad enough SQL support to run TPC queries – Joins, Sub-queries, Derived tables, etc. • Three different secondary indexing strategies – Immutable for write-once/append only data – Global for read-heavy mutable data – Local for write-heavy mutable or immutable data • Statistics driven parallel execution • Tracing and metrics for Monitoring & Management
Join and Subquery Support • Grammar: inner join; left/right/full outer join; cross join • Additional: semi join; anti join • Algorithms: hash-join; sort-merge-join • Optimizations: – Predicate push-down – FK-to-PK join optimization – Global index with missing data columns – Correlated subquery rewrite
TPC Example 1 Small-Quantity-Order Revenue Query (Q17) select sum(l_extendedprice) / 7.0 as avg_yearly from lineitem, part where p_partkey = l_partkey and p_brand = '[B]' and p_container = '[C]' and l_quantity < ( select 0.2 * avg(l_quantity) from lineitem where l_partkey = p_partkey ); CLIENT 4-WAY FULL SCAN OVER lineitem PARALLEL INNER JOIN TABLE 0 CLIENT 1-WAY FULL SCAN OVER part SERVER FILTER BY p_partkey = ‘[B]’ AND p_container = ‘[C]’ PARALLEL INNER JOIN TABLE 1 CLIENT 4-WAY FULL SCAN OVER lineitem SERVER AGGREGATE INTO DISTINCT ROWS BY l_partkey AFTER-JOIN SERVER FILTER BY l_quantity < $0
TPC Example 2 Order Priority Checking Query (Q4) select o_orderpriority, count(*) as order_count from orders where o_orderdate >= date '[D]' and o_orderdate < date '[D]' + interval '3' month and exists ( select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by o_orderpriority; CLIENT 4-WAY FULL SCAN OVER orders SERVER FILTER o_orderdate >= ‘[D]’ AND o_orderdate < ‘[D]’ + 3(d) SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY o_orderpriority CLIENT MERGE SORT SKIP-SCAN JOIN TABLE 0 CLIENT 4-WAY FULL SCAN OVER lineitem SERVER FILTER BY l_commitdate < l_receiptdate DYNAMIC SERVER FILTER BY o_orderkey IN l_orderkey
Join support - what can't we do? • Nested Loop Join • Statistics Guided Join Algorithm – Smartly choose the smaller table for the build side – Smartly switch between hash-join and sort-merge-join – Smartly turn on/off FK-to-PK join optimization
What’s New? • HBase 1.0 Support • Functional Indexes
Functional Indexes • Creating an index on an expression as opposed to just a column value. For example, the following will be a full table scan: SELECT AVG(response_time) FROM SERVER_METRICS WHERE DAYOFMONTH(create_time) = 1 • Adding the following functional index will turn it into a range scan: CREATE INDEX day_of_month_idx ON SERVER_METRICS (DAYOFMONTH(create_time)) INCLUDE (response_time)
What’s New? • HBase 1.0 Support • Functional Indexes • User Defined Functions
User Defined Functions • Extension points to Phoenix for domain-specific functions. For example, a geo-location application might load a set of UDFs like this: CREATE FUNCTION WOEID_DISTANCE(INTEGER,INTEGER) RETURNS INTEGER AS ‘org.apache.geo.woeidDistance’ USING JAR ‘/lib/geo/geoloc.jar’ • Querying, functional indexing, etc. then possible: SELECT * FROM woeid a JOIN woeid b ON a.country = b.country WHERE woeid_distance(a.ID,b.ID) < 5
What’s New? • HBase 1.0 Support • Functional Indexes • User Defined Functions • Query Server with Thin Driver
Query Server + Thin Driver • Offloads query planning and execution to different server(s) • Minimizes client dependencies – Enabler for ODBC driver (not available yet, though) • Connect like this instead: Connection conn = DriverManager.getConnection( “jdbc:phoenix:thin:url=http://localhost:8765”); • Still evolving, so no backward compatibility guarantees yet • For more information, see http://phoenix.apache.org/server.html
What’s New? • HBase 1.0 Support • Functional Indexes • User Defined Functions • Query Server with Thin Driver • Union All support • Testing at scale with Pherf • MR index build • Spark integration • Date built-in functions – WEEK, DAYOFMONTH, etc. • Transactions (WIP - will be in next release)
Transactions • Snapshot isolation model – Using Tephra (http://tephra.io/) – Supports REPEABLE_READ isolation level – Allows reading your own uncommitted data • Optional – Enabled on a table by table basis – No performance penalty when not used • Work in progress, but close to release – Try our txn branch – Will be available in next release
Optimistic Concurrency Control • Avoids cost of locking rows and tables • No deadlocks or lock escalations • Cost of conflict detection and possible rollback is higher • Good if conflicts are rare: short transaction, disjoint partitioning of work • Conflict detection not always necessary: write-once/append- only data
Tephra Architecture ZooKeeper Tx Manager (standby) HBase Master 1 Master 2 RS 1 RS 2 RS 4 RS 3 Client 1 Client 2 Client N Tx Manager (active)
time out try abort failed roll back in HBase write to HBase do work Client Tx Manager none complete V abortsucceeded in progress start tx start start tx commit try commit check conflicts invalid X invalidate failed Transaction Lifecycle
Tephra Architecture • TransactionAware client • Coordinates transaction lifecycle with manager • Communicates directly with HBase for reads and writes • Transaction Manager • Assigns transaction IDs • Maintains state on in-progress, committed and invalid transactions • Transaction Processor coprocessor • Applies server-side filtering for reads • Cleans up data from failed transactions, and no longer visible versions
What’s New? • HBase 1.0 Support • Functional Indexes • User Defined Functions • Query Server with Thin Driver • Union All support • Testing at scale with Pherf • MR index build • Spark integration • Date built-in functions – WEEK, DAYOFMONTH, etc. • Transactions (WIP - will be in next release)
What’s Next? • Is Phoenix done? • What about the Big Picture? – How can Phoenix be leveraged in the larger ecosystem? – Hive, Pig, Spark, MR integration with Phoenix exists today, but not a great story
What’s Next? You are here
Introducing Apache Calcite • Query parser, compiler, and planner framework – SQL-92 compliant (ever argue SQL with Julian? :-) ) – Enables Phoenix to get missing SQL support • Pluggable cost-based optimizer framework – Sane way to model push down through rules • Interop with other Calcite adaptors – Not for free, but it becomes feasible – Already used by Drill, Hive, Kylin, Samza – Supports any JDBC source (i.e. RDBMS - remember them :-) ) – One cost-model to rule them all
How does Phoenix plug in? Calcite Parser & Validator Calcite Query Optimizer Phoenix Query Plan Generator Phoenix Runtime Phoenix Tables over HBase JDBC Client SQL + Phoenix specific grammar Built-in rules + Phoenix specific rules
Optimization Rules • AggregateRemoveRule • FilterAggregateTransposeRule • FilterJoinRule • FilterMergeRule • JoinCommuteRule • PhoenixFilterScanMergeRule • PhoenixJoinSingleAggregateMergeRule • …
Query Example (filter push-down and smart join algorithm) LogicalFilter filter: $0 = ‘x’ LogicalJoin type: inner cond: $3 = $7 LogicalProject projects: $0, $5 LogicalTableScan table: A LogicalTableScan table: B PhoenixTableScan table: ‘a’ filter: $0 = ‘x’ PhoenixServerJoin type: inner cond: $3 = $1 PhoenixServerProject projects: $2, $0 Optimizer (with RelOptRules & ConvertRules) PhoenixTableScan table: ‘b’ PhoenixServerProject projects: $0, $2 PhoenixServerProject projects: $0, $3
Query Example (filter push-down and smart join algorithm) ScanPlan table: ‘a’ skip-scan: pk0 = ‘x’ projects: pk0, c3 HashJoinPlan types {inner} join-keys: {$1} projects: $2, $0 Build hash-key: $1 Phoenix Implementor PhoenixTableScan table: ‘a’ filter: $0 = ‘x’ PhoenixServerJoin type: inner cond: $3 = $1 PhoenixServerProject projects: $2, $0 PhoenixTableScan table: ‘b’ PhoenixServerProject projects: $0, $2 PhoenixServerProject projects: $0, $3 ScanPlan table: ‘b’ projects: col0, col2 Probe
Interoperibility Example • Joining data from Phoenix and mySQL EnumerableJoin PhoenixTableScan JdbcTableScan Phoenix Tables over HBase mySQL Database PhoenixToEnumerable Converter JdbcToEnumerable Converter
Query Example 1 WITH m AS (SELECT * FROM dept_manager dm WHERE from_date = (SELECT max(from_date) FROM dept_manager dm2 WHERE dm.dept_no = dm2.dept_no)) SELECT m.dept_no, d.dept_name, e.first_name, e.last_name FROM employees e JOIN m ON e.emp_no = m.emp_no JOIN departments d ON d.dept_no = m.dept_no ORDER BY d.dept_no;
Query Example 2 SELECT dept_no, title, count(*) FROM titles t JOIN dept_emp de ON t.emp_no = de.emp_no WHERE dept_no <= 'd006' GROUP BY rollup(dept_no, title) ORDER BY dept_no, title;
Thank you! Questions? * who uses Phoenix

The Evolution of a Relational Database Layer over HBase

  • 1.
    The Evolution ofa Relational Database Layer over HBase @ApachePhoenix http://phoenix.apache.org/ James Taylor (@JamesPlusPlus) V5
  • 2.
    About James • Architectat Salesforce.com – Part of the Big Data group • Lead of Apache Phoenix project • PMC member of Apache Calcite • Engineer and Product Manager at BEA Systems – XQuery-based federated query engine – SQL-based complex event processing engine • Various startups prior to that
  • 3.
    Agenda • What isApache Phoenix? • State of the Union • A Deeper Look – Joins and Subquery Support • What’s New? • What’s Next? • Q&A
  • 4.
    What is ApachePhoenix? • A relational database layer for Apache HBase – Query engine • Transforms SQL queries into native HBase API calls • Pushes as much work as possible onto the cluster for parallel execution – Metadata repository • Typed access to data stored in HBase tables – A JDBC driver • A top level Apache Software Foundation project – Originally developed at Salesforce – Now a top-level project at the ASF (Happy Birthday!) – A growing community with momentum
  • 5.
    Where Does PhoenixFit In? Sqoop RDBDataCollector Flume LogDataCollector Zookeeper Coordination YARN (MRv2) Cluster Resource Manager / MapReduce HDFS 2.0 Hadoop Distributed File System GraphX Graph analysis framework Phoenix Query execution engine HBase Distributed Database The Java Virtual Machine Hadoop Common JNI Spark Iterative In-Memory Computation MLLib Data mining Pig Data Manipulation Hive Structured Query Phoenix JDBC client
  • 6.
    State of theUnion • Broad enough SQL support to run TPC queries – Joins, Sub-queries, Derived tables, etc. • Three different secondary indexing strategies – Immutable for write-once/append only data – Global for read-heavy mutable data – Local for write-heavy mutable or immutable data • Statistics driven parallel execution • Tracing and metrics for Monitoring & Management
  • 7.
    Join and SubquerySupport • Grammar: inner join; left/right/full outer join; cross join • Additional: semi join; anti join • Algorithms: hash-join; sort-merge-join • Optimizations: – Predicate push-down – FK-to-PK join optimization – Global index with missing data columns – Correlated subquery rewrite
  • 8.
    TPC Example 1 Small-Quantity-OrderRevenue Query (Q17) select sum(l_extendedprice) / 7.0 as avg_yearly from lineitem, part where p_partkey = l_partkey and p_brand = '[B]' and p_container = '[C]' and l_quantity < ( select 0.2 * avg(l_quantity) from lineitem where l_partkey = p_partkey ); CLIENT 4-WAY FULL SCAN OVER lineitem PARALLEL INNER JOIN TABLE 0 CLIENT 1-WAY FULL SCAN OVER part SERVER FILTER BY p_partkey = ‘[B]’ AND p_container = ‘[C]’ PARALLEL INNER JOIN TABLE 1 CLIENT 4-WAY FULL SCAN OVER lineitem SERVER AGGREGATE INTO DISTINCT ROWS BY l_partkey AFTER-JOIN SERVER FILTER BY l_quantity < $0
  • 9.
    TPC Example 2 OrderPriority Checking Query (Q4) select o_orderpriority, count(*) as order_count from orders where o_orderdate >= date '[D]' and o_orderdate < date '[D]' + interval '3' month and exists ( select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by o_orderpriority; CLIENT 4-WAY FULL SCAN OVER orders SERVER FILTER o_orderdate >= ‘[D]’ AND o_orderdate < ‘[D]’ + 3(d) SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY o_orderpriority CLIENT MERGE SORT SKIP-SCAN JOIN TABLE 0 CLIENT 4-WAY FULL SCAN OVER lineitem SERVER FILTER BY l_commitdate < l_receiptdate DYNAMIC SERVER FILTER BY o_orderkey IN l_orderkey
  • 10.
    Join support -what can't we do? • Nested Loop Join • Statistics Guided Join Algorithm – Smartly choose the smaller table for the build side – Smartly switch between hash-join and sort-merge-join – Smartly turn on/off FK-to-PK join optimization
  • 11.
    What’s New? • HBase1.0 Support • Functional Indexes
  • 12.
    Functional Indexes • Creatingan index on an expression as opposed to just a column value. For example, the following will be a full table scan: SELECT AVG(response_time) FROM SERVER_METRICS WHERE DAYOFMONTH(create_time) = 1 • Adding the following functional index will turn it into a range scan: CREATE INDEX day_of_month_idx ON SERVER_METRICS (DAYOFMONTH(create_time)) INCLUDE (response_time)
  • 13.
    What’s New? • HBase1.0 Support • Functional Indexes • User Defined Functions
  • 14.
    User Defined Functions •Extension points to Phoenix for domain-specific functions. For example, a geo-location application might load a set of UDFs like this: CREATE FUNCTION WOEID_DISTANCE(INTEGER,INTEGER) RETURNS INTEGER AS ‘org.apache.geo.woeidDistance’ USING JAR ‘/lib/geo/geoloc.jar’ • Querying, functional indexing, etc. then possible: SELECT * FROM woeid a JOIN woeid b ON a.country = b.country WHERE woeid_distance(a.ID,b.ID) < 5
  • 15.
    What’s New? • HBase1.0 Support • Functional Indexes • User Defined Functions • Query Server with Thin Driver
  • 16.
    Query Server +Thin Driver • Offloads query planning and execution to different server(s) • Minimizes client dependencies – Enabler for ODBC driver (not available yet, though) • Connect like this instead: Connection conn = DriverManager.getConnection( “jdbc:phoenix:thin:url=http://localhost:8765”); • Still evolving, so no backward compatibility guarantees yet • For more information, see http://phoenix.apache.org/server.html
  • 17.
    What’s New? • HBase1.0 Support • Functional Indexes • User Defined Functions • Query Server with Thin Driver • Union All support • Testing at scale with Pherf • MR index build • Spark integration • Date built-in functions – WEEK, DAYOFMONTH, etc. • Transactions (WIP - will be in next release)
  • 18.
    Transactions • Snapshot isolationmodel – Using Tephra (http://tephra.io/) – Supports REPEABLE_READ isolation level – Allows reading your own uncommitted data • Optional – Enabled on a table by table basis – No performance penalty when not used • Work in progress, but close to release – Try our txn branch – Will be available in next release
  • 19.
    Optimistic Concurrency Control •Avoids cost of locking rows and tables • No deadlocks or lock escalations • Cost of conflict detection and possible rollback is higher • Good if conflicts are rare: short transaction, disjoint partitioning of work • Conflict detection not always necessary: write-once/append- only data
  • 20.
    Tephra Architecture ZooKeeper Tx Manager (standby) HBase Master1 Master 2 RS 1 RS 2 RS 4 RS 3 Client 1 Client 2 Client N Tx Manager (active)
  • 21.
    time out try abort failed rollback in HBase write to HBase do work Client Tx Manager none complete V abortsucceeded in progress start tx start start tx commit try commit check conflicts invalid X invalidate failed Transaction Lifecycle
  • 22.
    Tephra Architecture • TransactionAwareclient • Coordinates transaction lifecycle with manager • Communicates directly with HBase for reads and writes • Transaction Manager • Assigns transaction IDs • Maintains state on in-progress, committed and invalid transactions • Transaction Processor coprocessor • Applies server-side filtering for reads • Cleans up data from failed transactions, and no longer visible versions
  • 23.
    What’s New? • HBase1.0 Support • Functional Indexes • User Defined Functions • Query Server with Thin Driver • Union All support • Testing at scale with Pherf • MR index build • Spark integration • Date built-in functions – WEEK, DAYOFMONTH, etc. • Transactions (WIP - will be in next release)
  • 24.
    What’s Next? • IsPhoenix done? • What about the Big Picture? – How can Phoenix be leveraged in the larger ecosystem? – Hive, Pig, Spark, MR integration with Phoenix exists today, but not a great story
  • 25.
  • 26.
    Introducing Apache Calcite •Query parser, compiler, and planner framework – SQL-92 compliant (ever argue SQL with Julian? :-) ) – Enables Phoenix to get missing SQL support • Pluggable cost-based optimizer framework – Sane way to model push down through rules • Interop with other Calcite adaptors – Not for free, but it becomes feasible – Already used by Drill, Hive, Kylin, Samza – Supports any JDBC source (i.e. RDBMS - remember them :-) ) – One cost-model to rule them all
  • 27.
    How does Phoenixplug in? Calcite Parser & Validator Calcite Query Optimizer Phoenix Query Plan Generator Phoenix Runtime Phoenix Tables over HBase JDBC Client SQL + Phoenix specific grammar Built-in rules + Phoenix specific rules
  • 28.
    Optimization Rules • AggregateRemoveRule •FilterAggregateTransposeRule • FilterJoinRule • FilterMergeRule • JoinCommuteRule • PhoenixFilterScanMergeRule • PhoenixJoinSingleAggregateMergeRule • …
  • 29.
    Query Example (filter push-downand smart join algorithm) LogicalFilter filter: $0 = ‘x’ LogicalJoin type: inner cond: $3 = $7 LogicalProject projects: $0, $5 LogicalTableScan table: A LogicalTableScan table: B PhoenixTableScan table: ‘a’ filter: $0 = ‘x’ PhoenixServerJoin type: inner cond: $3 = $1 PhoenixServerProject projects: $2, $0 Optimizer (with RelOptRules & ConvertRules) PhoenixTableScan table: ‘b’ PhoenixServerProject projects: $0, $2 PhoenixServerProject projects: $0, $3
  • 30.
    Query Example (filter push-downand smart join algorithm) ScanPlan table: ‘a’ skip-scan: pk0 = ‘x’ projects: pk0, c3 HashJoinPlan types {inner} join-keys: {$1} projects: $2, $0 Build hash-key: $1 Phoenix Implementor PhoenixTableScan table: ‘a’ filter: $0 = ‘x’ PhoenixServerJoin type: inner cond: $3 = $1 PhoenixServerProject projects: $2, $0 PhoenixTableScan table: ‘b’ PhoenixServerProject projects: $0, $2 PhoenixServerProject projects: $0, $3 ScanPlan table: ‘b’ projects: col0, col2 Probe
  • 31.
    Interoperibility Example • Joiningdata from Phoenix and mySQL EnumerableJoin PhoenixTableScan JdbcTableScan Phoenix Tables over HBase mySQL Database PhoenixToEnumerable Converter JdbcToEnumerable Converter
  • 32.
    Query Example 1 WITHm AS (SELECT * FROM dept_manager dm WHERE from_date = (SELECT max(from_date) FROM dept_manager dm2 WHERE dm.dept_no = dm2.dept_no)) SELECT m.dept_no, d.dept_name, e.first_name, e.last_name FROM employees e JOIN m ON e.emp_no = m.emp_no JOIN departments d ON d.dept_no = m.dept_no ORDER BY d.dept_no;
  • 33.
    Query Example 2 SELECTdept_no, title, count(*) FROM titles t JOIN dept_emp de ON t.emp_no = de.emp_no WHERE dept_no <= 'd006' GROUP BY rollup(dept_no, title) ORDER BY dept_no, title;
  • 34.

Editor's Notes

  • #5 Who here is already familiar with Phoenix? GitHub -> Incubator -> TLP 1000 msg / month -> 2000 1 year old today
  • #7 TPC = complex queries used to benchmark SQL databases against each other
  • #8 All types of; algorithms. FK-PK opt Useful in global index. Other opt Many TPC queries.
  • #9 a yearly average price for orders of a specific part brand and part container with a quantity less than 20% of the average quantity of orders for the same part. join + correlated subquery. Two opt in query plan: 1st one de-correlation. 2nd one predicate push-down.
  • #10 An example of EXISTS => semi-join Triggers another opt, FK-PK join opt In query plan, SKIP-SCAN-JOIN with a dynamic filter At runtime, a skip-scan not a full-scan on orders table
  • #11 Something missing. 2 join algorithms, hash and merge. Former faster vs. latter for two large tables. How to decide which algorithm? Can’t. Prioritize one. Can’t do the join side either. Are we going to? Yes. Table stats for choosing join algorithm and optimization.
  • #12 Jeffrey & Enis Thomas
  • #14 Rajeshbabu
  • #16 Nick & Julian
  • #18 Maryann, myself, and Alicia Cody & Mujtaba Ravi Josh Mahonin Alicia Thomas, myself, and Gary Helmling
  • #20 Slides courtesy of Gary and Andreas Go to Gary’s talk on CDAP at 4:10
  • #24 Ran out of room – didn’t even mention the 8x perf improvement for unordered, unaggregated queries by Samarth Fantastic work by a lot of people to pull this together
  • #27 Join ordering and other optimizations now possible
  • #28 Details of integration: Position and interact? 1. A customized Parser + Validator 2. Query Optimizer + own table stats + Phoenix rules. 3. Translation process 4. Phoenix Runtime
  • #30 A join query with a WHERE condition. Highlight filter push-down and swap of join tables, called FilterJoinTransposeRule and JoinCommuteRule. Conversion from Logical to Phoenix physical at the same time. Opt: Filter on table ‘A’ … The tree on the right => output A good example of how Calcite can make the decision of join algorithms easy. 
  • #32 Default implementation of backend: Enumerable RelNodes w/ adapters, run Phoenix + other data source. Example of joining Phoenix with JDBC: EnumerableJoin …  Can replace JDBC table with one from other data source.
  • #33 WITH: we don’t have for front-end but equivalent to derived table. Get the grammar from Calcite and run in Phoenix.
  • #34 ROLLUP group-by: part to Phoenix and rest to itself.