An end-to-end data engineering pipeline built on the Brazilian E-Commerce (Olist) public dataset.
This project ingests ~100K orders from Kaggle, transforms them through a modern data stack (PySpark, S3, Snowflake, dbt), orchestrates with Airflow, and surfaces insights in a Streamlit dashboard -- all provisioned with Terraform and validated through CI/CD.
DataFlow Pro -- System Architecture ============================================================================ +----------+ +-----------+ +----------+ +----------+ +------------+ | | | | | | | | | | | Kaggle | --> | PySpark | --> | AWS S3 | --> | Snowflake| --> | Streamlit | | (source) | | (ingest) | | (lake) | | (DWH) | | (dashboard)| | | | | | | | | | | +----------+ +-----------+ +----------+ +----+-----+ +------------+ | +----+-----+ | | | dbt | | (models) | | | +----------+ Orchestration: Apache Airflow Infrastructure: Terraform CI/CD: GitHub Actions Monitoring: dbt tests + Airflow alerts ============================================================================ Data flow in detail:
- Extract -- Download 9 CSV files (~100K orders, 8 entity tables) from Kaggle via the API.
- Transform & Load (Ingestion) -- PySpark reads raw CSVs, applies schema validation, deduplication, timestamp casting, and string standardization, then writes clean Parquet files to S3.
- Load into Warehouse -- Snowflake COPY INTO loads Parquet from the S3 stage into the RAW schema.
- Model (dbt) -- Three-layer dbt project (staging, intermediate, marts) builds a clean analytics-ready star schema.
- Serve -- Streamlit dashboard queries the marts layer to visualize KPIs, trends, and geographic distribution.
| Layer | Technology | Purpose |
|---|---|---|
| Source Data | Kaggle API | Download the Olist Brazilian E-Commerce dataset |
| Ingestion | PySpark 3.5 | Schema enforcement, cleaning, Parquet conversion |
| Storage | AWS S3 | Data lake (raw + processed zones) |
| Warehouse | Snowflake | Cloud data warehouse with RBAC |
| Transformation | dbt-core 1.8 | SQL-based ELT modeling (staging/intermediate/marts) |
| Orchestration | Apache Airflow 2.9 | DAG scheduling and pipeline monitoring |
| Visualization | Streamlit 1.35 + Plotly | Interactive analytics dashboard |
| Infrastructure | Terraform 1.5+ | IaC for S3, IAM, and Snowflake resources |
| CI/CD | GitHub Actions | Automated testing, linting, deployment |
| Containerization | Docker + Docker Compose | Reproducible local development |
| Testing | pytest, dbt tests, Great Expectations (dbt_expectations) | Data quality at every layer |
ecommerce/ |-- .github/ | `-- workflows/ | |-- dbt_ci.yml # dbt CI on pull requests | `-- deploy.yml # Full deployment pipeline | |-- dashboard/ | |-- app.py # Streamlit application entrypoint | |-- requirements.txt # Dashboard Python dependencies | `-- utils/ | `-- snowflake_connector.py # Snowflake connection manager | |-- data/ | `-- .gitkeep # Placeholder (raw CSVs downloaded here) | |-- dbt_project/ | |-- dbt_project.yml # dbt project configuration | |-- packages.yml # dbt packages (dbt_utils, dbt_expectations) | |-- profiles.yml # Connection profiles (dev, prod, ci) | `-- models/ | `-- staging/ | |-- _staging__sources.yml | |-- _staging__models.yml | |-- stg_orders.sql | |-- stg_order_items.sql | |-- stg_customers.sql | |-- stg_products.sql | |-- stg_sellers.sql | |-- stg_payments.sql | |-- stg_reviews.sql | `-- stg_geolocation.sql (referenced in models.yml) | |-- infrastructure/ | |-- main.tf # Root Terraform module | |-- variables.tf # Input variables | |-- outputs.tf # Output values | |-- environments/ # Per-environment tfvars | `-- modules/ | |-- s3/ # S3 buckets (raw + processed) | |-- iam/ # Cross-service IAM roles | `-- snowflake/ # Snowflake DB objects via Terraform | |-- ingestion/ | |-- notebooks/ | | `-- 01_ingest_csv.py # PySpark batch ingestion script | |-- src/ | | |-- schemas.py # PySpark StructType schemas for all 8 tables | | |-- transformations.py # Reusable transform functions | | `-- utils.py # Spark session, path helpers, Parquet writer | `-- tests/ | `-- __init__.py | |-- orchestration/ | |-- dags/ # Airflow DAG definitions | |-- plugins/ # Custom Airflow operators | `-- config/ # Airflow configuration overrides | |-- scripts/ | |-- download_dataset.py # Kaggle dataset downloader | |-- run_pipeline.sh # End-to-end pipeline runner | `-- setup_env.sh # Environment bootstrap script | |-- snowflake/ | `-- setup/ | |-- 01_databases.sql # Create ECOMMERCE database | |-- 02_schemas.sql # Create RAW/STAGING/INTERMEDIATE/MARTS + RBAC | |-- 03_warehouses.sql # Create compute warehouses | |-- 04_stages.sql # Create S3 external stages | `-- 05_file_formats.sql # Create CSV/Parquet file formats | |-- .env.example # Template for environment variables |-- .gitignore |-- docker-compose.yml # Airflow + Streamlit local stack |-- Dockerfile.airflow |-- Dockerfile.streamlit |-- Makefile # Developer task runner |-- requirements.txt # Core Python dependencies `-- requirements-dev.txt # Development/testing dependencies | Requirement | Version | Notes |
|---|---|---|
| Python | 3.11+ | Used for PySpark ingestion, dbt, and Streamlit |
| Docker | 24+ | Required for Airflow and containerized deployments |
| Docker Compose | 2.20+ | Multi-service orchestration |
| Snowflake Account | -- | Free trial at signup.snowflake.com |
| AWS Account | -- | S3 bucket for the data lake layer |
| Kaggle API Key | -- | Download from kaggle.com/settings |
| Terraform | 1.5+ | Only if provisioning infrastructure via IaC |
# 1. Clone the repository git clone https://github.com/<your-username>/dataflow-pro-ecommerce.git cd dataflow-pro-ecommerce # 2. Run the automated setup script bash scripts/setup_env.sh # 3. Configure credentials cp .env.example .env # Edit .env with your Snowflake, AWS, and Kaggle credentials # 4. Activate the virtual environment source .venv/bin/activate # 5. Download the Olist dataset from Kaggle make download # 6. Run the full pipeline (ingest -> transform -> test) make pipeline # 7. Launch the Streamlit dashboard make dashboardTip: Run
make helpto see all available commands.
The ingestion layer reads the 8 raw Olist CSV files, enforces PySpark schemas, and writes clean Parquet output.
# Ingest all tables make ingest # Or run the PySpark script directly with options python ingestion/notebooks/01_ingest_csv.py \ --input-dir data/raw \ --output-dir data/processed \ --tables orders customers productsWhat the ingestion pipeline does:
- Applies predefined
StructTypeschemas for type safety - Cleans column names (lowercase, strip whitespace)
- Removes duplicate rows
- Casts timestamp strings to proper
TimestampType - Standardizes city/state strings (trim + lowercase)
- Filters rows with null primary keys
- Appends
_ingested_atand_source_fileaudit columns
Run the SQL setup scripts in order against your Snowflake account. These create the database, schemas (RAW, STAGING, INTERMEDIATE, MARTS), warehouses, external stages, file formats, and RBAC roles.
# Review the setup order make snowflake-setup # Execute in Snowflake UI or via SnowSQL: # snowflake/setup/01_databases.sql # snowflake/setup/02_schemas.sql # snowflake/setup/03_warehouses.sql # snowflake/setup/04_stages.sql # snowflake/setup/05_file_formats.sqlRoles created:
| Role | Purpose |
|---|---|
TRANSFORM_ROLE | dbt and Airflow -- read RAW, write STAGING/INTERMEDIATE/MARTS |
ANALYTICS_ROLE | Dashboards and analysts -- read-only on all schemas |
The dbt project lives in dbt_project/ and connects to Snowflake via profiles.yml.
# Install dbt packages (dbt_utils, dbt_expectations) make dbt-deps # Run all models make dbt-run # Run tests make dbt-test # Build (run + test in DAG order) make dbt-build # Generate and serve documentation make dbt-docsProfile targets:
| Target | Schema | Use case |
|---|---|---|
dev | DEV | Local development |
prod | PROD | Production deployment |
ci | CI_PR_<pull_request#> | GitHub Actions CI builds |
Airflow runs via Docker Compose and manages the end-to-end DAG:
# Start Airflow (webserver + scheduler + Postgres) make airflow-up # Access the UI at http://localhost:8080 # Username: admin # Password: admin # Stop Airflow make airflow-downThe interactive dashboard connects to the Snowflake marts layer and visualizes:
- Revenue trends and order volume over time
- Customer geographic distribution across Brazilian states
- Product category performance
- Seller metrics and delivery time analysis
- Payment method breakdowns
# Run locally make dashboard # Or via Docker Compose docker-compose up streamlitThe dashboard supports a demo mode that renders with sample data when Snowflake credentials are not configured.
Terraform provisions all cloud resources: S3 buckets with lifecycle policies, IAM roles for Snowflake-to-S3 access, and Snowflake database objects.
cd infrastructure # Initialize providers terraform init # Preview changes terraform plan -var-file=environments/dev.tfvars # Apply terraform apply -var-file=environments/dev.tfvarsModules:
| Module | Resources Created |
|---|---|
s3 | Raw and processed data buckets, lifecycle rules |
iam | Cross-account role for Snowflake S3 access |
snowflake | Database, schemas, warehouses, stages |
The dbt project follows a three-layer medallion architecture:
Materialized as views. One-to-one mapping with raw source tables. Responsibilities:
- Rename columns for consistency (e.g.,
order_delivered_customer_date->delivered_customer_at) - Cast data types (
string->timestamp_ntz,numeric(16,2)) - Standardize strings (uppercase city/state names)
- Filter invalid records
- Coalesce nulls in dimension attributes
| Model | Source Table | Key Columns |
|---|---|---|
stg_orders | raw.orders | order_id, customer_id, order_status, timestamps |
stg_order_items | raw.order_items | order_id, product_id, seller_id, price, freight_value |
stg_customers | raw.customers | customer_id, customer_unique_id, city, state |
stg_products | raw.products | product_id, product_category_name, dimensions |
stg_sellers | raw.sellers | seller_id, city, state |
stg_payments | raw.payments | order_id, payment_type, payment_value |
stg_reviews | raw.reviews | review_id, order_id, review_score |
stg_geolocation | raw.geolocation | zip_code_prefix, latitude, longitude |
Materialized as ephemeral (CTEs inlined into downstream models). Business logic joins and enrichments:
int_order_enriched-- Join orders with items, payments, and reviewsint_customer_orders-- Aggregate order history per customerint_seller_performance-- Seller-level delivery and revenue metrics
Materialized as tables. Star-schema models optimized for dashboard queries:
fct_orders-- Grain: one row per order with total revenue, items, delivery timesfct_order_items-- Grain: one row per line item with denormalized product/seller attributesdim_customers-- Customer dimension with geographic attributes and order historydim_products-- Product dimension with category and physical attributesdim_sellers-- Seller dimension with location and performance metricsdim_dates-- Date dimension for time-series analysis
# Run all ingestion tests make test # With coverage pytest ingestion/tests/ -v --cov=ingestion/src --cov-report=term-missingThe project uses both generic and schema tests defined in _staging__models.yml:
| Test Type | Coverage |
|---|---|
unique | Primary keys (order_id, product_id, etc.) |
not_null | Required columns across all staging models |
accepted_values | order_status, payment_type, review_score |
relationships | Foreign keys (orders -> customers, items -> orders, etc.) |
dbt_expectations | Advanced data quality checks via the dbt_expectations package |
# Run all dbt tests make dbt-test # Run tests for a specific model cd dbt_project && dbt test --select stg_orders# Check style make lint # Auto-format make formatThe project uses two GitHub Actions workflows:
Triggered on PRs that modify dbt_project/**.
| Step | Description |
|---|---|
| Checkout + Python setup | Full git history for state comparison |
dbt deps | Install dbt_utils and dbt_expectations packages |
dbt compile | Validate SQL syntax and ref/source resolution |
dbt build (modified+) | Build only changed models + downstream dependencies |
dbt test | Run all generic and singular tests |
| Schema cleanup | Drop the ephemeral CI_PR_<number> schema |
Each PR gets an isolated Snowflake schema (CI_PR_42, CI_PR_87, etc.) so parallel PRs never conflict.
Triggered on push to main.
lint-and-test ──> terraform-apply ──> dbt-deploy ──> deploy-dashboard | Job | Description |
|---|---|
| lint-and-test | flake8 + black + pytest on ingestion code |
| terraform-plan | Plan-only on PRs (for review) |
| terraform-apply | Apply infrastructure changes (requires approval) |
| dbt-deploy | dbt run + dbt test + dbt docs generate |
| deploy-dashboard | Build and push the Streamlit Docker image |
The terraform-apply, dbt-deploy, and deploy-dashboard jobs use the production GitHub environment, which can be configured with required reviewers and deployment protection rules.
| Secret | Description |
|---|---|
SNOWFLAKE_ACCOUNT | Snowflake account identifier |
SNOWFLAKE_USER | Snowflake username |
SNOWFLAKE_PASSWORD | Snowflake password |
SNOWFLAKE_ROLE | Snowflake role (e.g., TRANSFORM_ROLE) |
SNOWFLAKE_WAREHOUSE | Snowflake warehouse name |
SNOWFLAKE_DATABASE | Snowflake database name |
AWS_ACCESS_KEY_ID | AWS IAM access key |
AWS_SECRET_ACCESS_KEY | AWS IAM secret key |
AWS_REGION | AWS region (e.g., us-east-1) |
DOCKER_REGISTRY | Container registry URL |
DOCKER_USERNAME | Registry username |
DOCKER_PASSWORD | Registry password or token |
| Layer | Mechanism | Details |
|---|---|---|
| Ingestion | PySpark row-count logging | Input vs. output row counts per table |
| Warehouse | Snowflake source freshness | warn_after: 24h, error_after: 48h |
| dbt | Schema tests + dbt_expectations | Uniqueness, not-null, accepted values, ranges |
| Airflow | DAG-level email alerts | On task failure and SLA miss |
| Dashboard | Streamlit health check | Docker HEALTHCHECK on /_stcore/health |
| CI/CD | GitHub Actions status checks | Required to pass before merge |
Contributions are welcome. Please follow these guidelines:
- Fork the repository and create a feature branch from
main. - Write tests for any new ingestion logic or dbt models.
- Follow the style guide:
- Python: Black formatter (line length 120), flake8 linting
- SQL: Lowercase keywords, trailing commas, CTEs over subqueries
- Commits: Conventional Commits format (
feat:,fix:,docs:, etc.)
- Run the checks locally before pushing:
make lint make test make dbt-build - Open a Pull Request against
mainwith a clear description of changes. - CI must pass before the PR can be merged.
# Install all dependencies (including dev tools) make setup # Run the full lint + test suite make lint && make testThis project is licensed under the MIT License.
MIT License Copyright (c) 2024 DataFlow Pro Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - Dataset: Brazilian E-Commerce Public Dataset by Olist (CC BY-NC-SA 4.0)
- dbt Packages: dbt_utils by dbt Labs, dbt_expectations by Calogica