Skip to content

harrydevforlife/starlink

Repository files navigation

Starlink: A Python Query Engine

Making query engines understandable for Python developers

Starlink is an educational query engine implementation in Python, designed to demystify how modern query engines work. Built with clarity and learning in mind, it demonstrates the complete journey from SQL queries to data retrieval.

Philosophy

Starlink aims to make query engines easily understandable for Pythonistas. It focuses on the process - showing you exactly how a query flows through the system:

SQL → Logical Plan → Optimized Plan → Physical Plan → Data Sources 

Inspiration

Special thanks to Andy Grove and How Query Engines Work for inspiring this project. The book provides a comprehensive introduction to query engines, and Starlink aims to make these concepts accessible to Python developers through a clean, educational implementation.

Starlink's further implementation

  • Supported hash join, inner join at this moment.
  • Supported SQL alias expr.
  • Supported filter pushdown.

Quick Start

Installation

# Clone the repository git clone https://github.com/harrydevforlife/starlink.git cd starlink # Install dependencies uv sync # Set up Python path export PYTHONPATH=$PWD/src:$PYTHONPATH

Basic Usage

Using SQL

from pathlib import Path from starlink.execution.context import ExecutionContext # Create execution context ctx = ExecutionContext({}) # Register a CSV file ctx.register_csv("tripdata", Path("data/yellow_tripdata_2019-01.csv")) # Execute SQL query df = ctx.sql("""  SELECT   passenger_count,   MAX(fare_amount) AS max_fare  FROM tripdata   GROUP BY passenger_count """) # View the logical plan print("Original Plan:") print(df.logicalPlan().pretty()) # View the optimized plan print("\nOptimized Plan:") print(df.optimizedPlan().pretty()) # Execute and get results result = ctx.execute(df) result.show()

The output will be like this:

Original Plan: Projection: #0, #1 as max_fare Aggregate: groupExpr=[#passenger_count], aggregateExpr=[MAX(#fare_amount)] Selection: CAST(#fare_amount AS double) > 80 Projection: #passenger_count, #fare_amount Scan: data/yellow_tripdata_2019-01.csv; projection=None Optimized Plan: Projection: #0, #1 as max_fare Aggregate: groupExpr=[#passenger_count], aggregateExpr=[MAX(#fare_amount)] Projection: #passenger_count, #fare_amount Scan: data/yellow_tripdata_2019-01.csv; projection=[passenger_count, fare_amount], filter=CAST(#fare_amount AS double) > 80 | passenger_count | max_fare | |-------------------|------------| | 1 | 99.99 | | 2 | 99.75 | | 4 | 99 | | 5 | 99.5 | | 6 | 98 | | 3 | 99.75 | | 0 | 99 | | 8 | 87 | | 9 | 92 | (9 row(s) shown) 

For join operations, you can use the join method:

SELECT c.name, c.city, o.order_id, o.total FROM customers AS c JOIN orders AS o ON c.customer_id = o.order_customer_id WHERE CAST(o.total AS DOUBLE) > 30 AND c.city = 'New York'

Using DataFrame API

from starlink.execution.context import ExecutionContext from starlink.logicalplan.expressions import col, Max, Count ctx = ExecutionContext({}) df = ( ctx.csv("data/yellow_tripdata_2019-01.csv") .aggregate( [col("passenger_count")], [Max(col("fare_amount")), Count(col("fare_amount"))] ) ) result = ctx.execute(df) result.show()

Architecture: The Query Journey

Starlink demonstrates the complete query execution pipeline. Here's how a query flows through the system:

1. SQL Parsing (sql/)

Input: SQL string
Output: Abstract Syntax Tree (AST)

# SQL: "SELECT passenger_count, MAX(fare_amount) FROM tripdata GROUP BY passenger_count" # ↓ # SqlSelect AST with projection, table, and groupBy information 

Key Components:

  • sql_tokenizer.py - Tokenizes SQL into tokens
  • sql_parser.py - Parses tokens into SQL AST using Pratt parser
  • sql_expr.py - Defines SQL expression types

2. Logical Planning (logicalplan/)

Input: SQL AST
Output: Logical Plan (what to do, not how)

# Logical Plan represents the "what" - high-level operations Projection: #0, #1 Aggregate: groupExpr=[#passenger_count], aggregateExpr=[MAX(#fare_amount)] Scan: tripdata; projection=None 

Key Components:

  • scan.py - Represents reading from a data source
  • projection.py - Represents column selection/transformation
  • aggregate.py - Represents grouping and aggregation
  • select.py - Represents filtering (WHERE clause)
  • expressions.py - Logical expressions (Column, Max, Count, etc.)

3. Query Optimization (optimizer/)

Input: Logical Plan
Output: Optimized Logical Plan

# Optimizer applies rules to improve the plan # Example: Projection Pushdown Projection: #0, #1 Aggregate: groupExpr=[#passenger_count], aggregateExpr=[MAX(#fare_amount)] Scan: tripdata; projection=[passenger_count, fare_amount] # ← Pushed down! 

Optimization Rules:

  • Projection Pushdown - Only read columns that are actually needed
  • Predicate Pushdown - Only read rows that are actually needed
  • More rules can be added following the OptimizerRule interface

4. Physical Planning (queryplanner/)

Input: Optimized Logical Plan
Output: Physical Plan (how to execute)

# Physical Plan represents the "how" - concrete execution steps HashAggregateExec: groupExpr=[ColumnExpression(0)], aggregateExpr=[MaxExpression(ColumnExpression(1))] ScanExec: projection=[passenger_count, fare_amount] 

Key Components:

  • queryplanner.py - Converts logical plans to physical plans
  • Physical operators in physicalplan/:
    • ScanExec - Scans data sources
    • ProjectionExec - Applies projections
    • SelectionExec - Applies filters
    • HashAggregateExec - Performs hash-based aggregation

5. Execution (execution/ + physicalplan/)

Input: Physical Plan
Output: RecordBatch results

# Physical operators execute the plan # Each operator produces RecordBatch objects (columnar data) results = physical_plan.execute_batches() # Returns: Sequence[RecordBatch] results = ctx.execute(df) # Returns: QueryResult

Execution Flow:

  1. ExecutionContext.execute() - Entry point
  2. Optimizes the logical plan
  3. Creates physical plan
  4. Executes physical plan
  5. Returns results as RecordBatch objects

6. Data Sources (datasources/)

Input: File paths or data
Output: RecordBatch streams

Supported Formats:

  • CSV (csv.py) - With automatic delimiter detection, header support
  • Parquet (parquet.py) - Efficient columnar format
  • Memory (memory.py) - In-memory data sources. Just for testing.

Key Concepts

Logical vs Physical Plans

Logical Plan (What):

  • High-level, declarative representation
  • Independent of execution strategy
  • Example: "Aggregate by passenger_count"

Physical Plan (How):

  • Concrete execution strategy
  • Specific algorithms chosen
  • Example: "HashAggregateExec using hash table"

Columnar Data Format

Starlink uses Apache Arrow's columnar format:

  • RecordBatch - A batch of rows stored column-wise
  • ColumnVector - Interface for column data
  • ArrowFieldVector - PyArrow-based implementation

This format enables:

  • Efficient vectorized operations
  • Better cache locality
  • Reduced memory overhead

Projection Pushdown

A key optimization that pushes column selection down to the data source:

# Without optimization: Read all columns, then filter Scan: tripdata; projection=None # Reads all 18 columns # With optimization: Only read needed columns Scan: tripdata; projection=[passenger_count, fare_amount] # Reads only 2 columns 

This dramatically reduces I/O and memory usage for wide tables.

Predicate Pushdown

A key optimization that pushes filter selection down to the data source:

# Without optimization: Read all columns, then filter Scan: tripdata; filter=None # Reads all 18 columns # With optimization: Only read needed columns Scan: tripdata; filter=CAST(#total AS double) > 30 # Reads only 2 columns 

Project Structure

starlink/ ├── datatypes/ # Core data types (Schema, RecordBatch, ColumnVector) ├── datasources/ # Data source implementations (CSV, Parquet) ├── logicalplan/ # Logical plan nodes and expressions ├── optimizer/ # Query optimization rules ├── physicalplan/ # Physical execution operators ├── queryplanner/ # Logical → Physical plan conversion ├── sql/ # SQL parsing and planning └── execution/ # Execution context and coordination 

Examples

Check out the examples/ directory for more examples:

  • query_sql.py - SQL query examples
  • query_csv.py - DataFrame API with CSV
  • query_parquet.py - Parquet file queries
  • query_count.py - COUNT aggregation examples
  • query_sql_join.py - SQL join examples
  • queries.py - SQL query examples

Extending Starlink

Adding a New Optimization Rule

from starlink.optimizer.optimizer import OptimizerRule from starlink.logicalplan.logical import LogicalPlan class MyOptimizationRule(OptimizerRule): def optimize(self, plan: LogicalPlan) -> LogicalPlan: # Your optimization logic here return optimized_plan

Adding a New Data Source

from starlink.datasources.datasource import DataSource from starlink.datatypes.record_batch import RecordBatch class MyDataSource(DataSource): def schema(self) -> Schema: # Return schema pass def scan(self, projection: List[str], filter: Optional[Expression] = None) -> Sequence[RecordBatch]: # Return RecordBatch stream pass

Adding a New Aggregate Function

  1. Add logical expression in logicalplan/expressions.py
  2. Add physical expression in physicalplan/expressions/
  3. Add accumulator in the aggregate expression
  4. Update queryplanner.py to handle the new function

Testing

# Run all tests PYTHONPATH=src python -m pytest tests/ -v # Run specific test suite PYTHONPATH=src python -m pytest tests/starlink/optimizer/ -v

Performance

Starlink is designed for educational purposes and focuses on clarity over performance. However, it demonstrates:

  • Columnar data processing with Apache Arrow
  • Projection pushdown optimization
  • Efficient batch processing
  • Memory-efficient streaming

Benchmarking results for more details.

For production use, consider:

Contributing

Contributions are welcome! Areas where help is needed:

  • Additional optimization rules
  • More data source formats
  • Additional SQL features
  • Performance improvements
  • Documentation improvements
  • Test coverage

License

Acknowledgments

  • Andy Grove and How Query Engines Work - For the excellent educational resource that inspired this project
  • Apache Arrow - For the columnar data format and PyArrow library
  • The Python data engineering community - For continuous inspiration

Resources

Roadmap

  • Additional SQL features (JOINs, subqueries, etc.). Inner join is supported.
  • More optimization rules (predicate pushdown, etc.). Already implemented projection pushdown, predicate pushdown.
  • Additional data sources (JSON, database connectors)
  • Query execution statistics
  • Better error messages
  • Performance profiling tools

Happy Querying and Exploring!

Starlink - Making query engines understandable, one Python line at a time.

starlink

About

Starlink – an educational SQL query engine in Python, built on Arrow.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages