Skip to content

kahramanmurat/dataflow-ecommerce

Repository files navigation

DataFlow Pro: E-Commerce Analytics Pipeline

An end-to-end data engineering pipeline built on the Brazilian E-Commerce (Olist) public dataset.

This project ingests ~100K orders from Kaggle, transforms them through a modern data stack (PySpark, S3, Snowflake, dbt), orchestrates with Airflow, and surfaces insights in a Streamlit dashboard -- all provisioned with Terraform and validated through CI/CD.


Architecture

 DataFlow Pro -- System Architecture ============================================================================ +----------+ +-----------+ +----------+ +----------+ +------------+ | | | | | | | | | | | Kaggle | --> | PySpark | --> | AWS S3 | --> | Snowflake| --> | Streamlit | | (source) | | (ingest) | | (lake) | | (DWH) | | (dashboard)| | | | | | | | | | | +----------+ +-----------+ +----------+ +----+-----+ +------------+ | +----+-----+ | | | dbt | | (models) | | | +----------+ Orchestration: Apache Airflow Infrastructure: Terraform CI/CD: GitHub Actions Monitoring: dbt tests + Airflow alerts ============================================================================ 

Data flow in detail:

  1. Extract -- Download 9 CSV files (~100K orders, 8 entity tables) from Kaggle via the API.
  2. Transform & Load (Ingestion) -- PySpark reads raw CSVs, applies schema validation, deduplication, timestamp casting, and string standardization, then writes clean Parquet files to S3.
  3. Load into Warehouse -- Snowflake COPY INTO loads Parquet from the S3 stage into the RAW schema.
  4. Model (dbt) -- Three-layer dbt project (staging, intermediate, marts) builds a clean analytics-ready star schema.
  5. Serve -- Streamlit dashboard queries the marts layer to visualize KPIs, trends, and geographic distribution.

Tech Stack

Layer Technology Purpose
Source Data Kaggle API Download the Olist Brazilian E-Commerce dataset
Ingestion PySpark 3.5 Schema enforcement, cleaning, Parquet conversion
Storage AWS S3 Data lake (raw + processed zones)
Warehouse Snowflake Cloud data warehouse with RBAC
Transformation dbt-core 1.8 SQL-based ELT modeling (staging/intermediate/marts)
Orchestration Apache Airflow 2.9 DAG scheduling and pipeline monitoring
Visualization Streamlit 1.35 + Plotly Interactive analytics dashboard
Infrastructure Terraform 1.5+ IaC for S3, IAM, and Snowflake resources
CI/CD GitHub Actions Automated testing, linting, deployment
Containerization Docker + Docker Compose Reproducible local development
Testing pytest, dbt tests, Great Expectations (dbt_expectations) Data quality at every layer

Project Structure

ecommerce/ |-- .github/ | `-- workflows/ | |-- dbt_ci.yml # dbt CI on pull requests | `-- deploy.yml # Full deployment pipeline | |-- dashboard/ | |-- app.py # Streamlit application entrypoint | |-- requirements.txt # Dashboard Python dependencies | `-- utils/ | `-- snowflake_connector.py # Snowflake connection manager | |-- data/ | `-- .gitkeep # Placeholder (raw CSVs downloaded here) | |-- dbt_project/ | |-- dbt_project.yml # dbt project configuration | |-- packages.yml # dbt packages (dbt_utils, dbt_expectations) | |-- profiles.yml # Connection profiles (dev, prod, ci) | `-- models/ | `-- staging/ | |-- _staging__sources.yml | |-- _staging__models.yml | |-- stg_orders.sql | |-- stg_order_items.sql | |-- stg_customers.sql | |-- stg_products.sql | |-- stg_sellers.sql | |-- stg_payments.sql | |-- stg_reviews.sql | `-- stg_geolocation.sql (referenced in models.yml) | |-- infrastructure/ | |-- main.tf # Root Terraform module | |-- variables.tf # Input variables | |-- outputs.tf # Output values | |-- environments/ # Per-environment tfvars | `-- modules/ | |-- s3/ # S3 buckets (raw + processed) | |-- iam/ # Cross-service IAM roles | `-- snowflake/ # Snowflake DB objects via Terraform | |-- ingestion/ | |-- notebooks/ | | `-- 01_ingest_csv.py # PySpark batch ingestion script | |-- src/ | | |-- schemas.py # PySpark StructType schemas for all 8 tables | | |-- transformations.py # Reusable transform functions | | `-- utils.py # Spark session, path helpers, Parquet writer | `-- tests/ | `-- __init__.py | |-- orchestration/ | |-- dags/ # Airflow DAG definitions | |-- plugins/ # Custom Airflow operators | `-- config/ # Airflow configuration overrides | |-- scripts/ | |-- download_dataset.py # Kaggle dataset downloader | |-- run_pipeline.sh # End-to-end pipeline runner | `-- setup_env.sh # Environment bootstrap script | |-- snowflake/ | `-- setup/ | |-- 01_databases.sql # Create ECOMMERCE database | |-- 02_schemas.sql # Create RAW/STAGING/INTERMEDIATE/MARTS + RBAC | |-- 03_warehouses.sql # Create compute warehouses | |-- 04_stages.sql # Create S3 external stages | `-- 05_file_formats.sql # Create CSV/Parquet file formats | |-- .env.example # Template for environment variables |-- .gitignore |-- docker-compose.yml # Airflow + Streamlit local stack |-- Dockerfile.airflow |-- Dockerfile.streamlit |-- Makefile # Developer task runner |-- requirements.txt # Core Python dependencies `-- requirements-dev.txt # Development/testing dependencies 

Prerequisites

Requirement Version Notes
Python 3.11+ Used for PySpark ingestion, dbt, and Streamlit
Docker 24+ Required for Airflow and containerized deployments
Docker Compose 2.20+ Multi-service orchestration
Snowflake Account -- Free trial at signup.snowflake.com
AWS Account -- S3 bucket for the data lake layer
Kaggle API Key -- Download from kaggle.com/settings
Terraform 1.5+ Only if provisioning infrastructure via IaC

Quick Start

# 1. Clone the repository git clone https://github.com/<your-username>/dataflow-pro-ecommerce.git cd dataflow-pro-ecommerce # 2. Run the automated setup script bash scripts/setup_env.sh # 3. Configure credentials cp .env.example .env # Edit .env with your Snowflake, AWS, and Kaggle credentials # 4. Activate the virtual environment source .venv/bin/activate # 5. Download the Olist dataset from Kaggle make download # 6. Run the full pipeline (ingest -> transform -> test) make pipeline # 7. Launch the Streamlit dashboard make dashboard

Tip: Run make help to see all available commands.


Detailed Setup

1. Data Ingestion (PySpark)

The ingestion layer reads the 8 raw Olist CSV files, enforces PySpark schemas, and writes clean Parquet output.

# Ingest all tables make ingest # Or run the PySpark script directly with options python ingestion/notebooks/01_ingest_csv.py \ --input-dir data/raw \ --output-dir data/processed \ --tables orders customers products

What the ingestion pipeline does:

  • Applies predefined StructType schemas for type safety
  • Cleans column names (lowercase, strip whitespace)
  • Removes duplicate rows
  • Casts timestamp strings to proper TimestampType
  • Standardizes city/state strings (trim + lowercase)
  • Filters rows with null primary keys
  • Appends _ingested_at and _source_file audit columns

2. Snowflake Configuration

Run the SQL setup scripts in order against your Snowflake account. These create the database, schemas (RAW, STAGING, INTERMEDIATE, MARTS), warehouses, external stages, file formats, and RBAC roles.

# Review the setup order make snowflake-setup # Execute in Snowflake UI or via SnowSQL: # snowflake/setup/01_databases.sql # snowflake/setup/02_schemas.sql # snowflake/setup/03_warehouses.sql # snowflake/setup/04_stages.sql # snowflake/setup/05_file_formats.sql

Roles created:

Role Purpose
TRANSFORM_ROLE dbt and Airflow -- read RAW, write STAGING/INTERMEDIATE/MARTS
ANALYTICS_ROLE Dashboards and analysts -- read-only on all schemas

3. dbt Project

The dbt project lives in dbt_project/ and connects to Snowflake via profiles.yml.

# Install dbt packages (dbt_utils, dbt_expectations) make dbt-deps # Run all models make dbt-run # Run tests make dbt-test # Build (run + test in DAG order) make dbt-build # Generate and serve documentation make dbt-docs

Profile targets:

Target Schema Use case
dev DEV Local development
prod PROD Production deployment
ci CI_PR_<pull_request#> GitHub Actions CI builds

4. Airflow Orchestration

Airflow runs via Docker Compose and manages the end-to-end DAG:

# Start Airflow (webserver + scheduler + Postgres) make airflow-up # Access the UI at http://localhost:8080 # Username: admin # Password: admin # Stop Airflow make airflow-down

5. Streamlit Dashboard

The interactive dashboard connects to the Snowflake marts layer and visualizes:

  • Revenue trends and order volume over time
  • Customer geographic distribution across Brazilian states
  • Product category performance
  • Seller metrics and delivery time analysis
  • Payment method breakdowns
# Run locally make dashboard # Or via Docker Compose docker-compose up streamlit

The dashboard supports a demo mode that renders with sample data when Snowflake credentials are not configured.

6. Terraform Infrastructure

Terraform provisions all cloud resources: S3 buckets with lifecycle policies, IAM roles for Snowflake-to-S3 access, and Snowflake database objects.

cd infrastructure # Initialize providers terraform init # Preview changes terraform plan -var-file=environments/dev.tfvars # Apply terraform apply -var-file=environments/dev.tfvars

Modules:

Module Resources Created
s3 Raw and processed data buckets, lifecycle rules
iam Cross-account role for Snowflake S3 access
snowflake Database, schemas, warehouses, stages

Data Model

The dbt project follows a three-layer medallion architecture:

Staging (stg_*)

Materialized as views. One-to-one mapping with raw source tables. Responsibilities:

  • Rename columns for consistency (e.g., order_delivered_customer_date -> delivered_customer_at)
  • Cast data types (string -> timestamp_ntz, numeric(16,2))
  • Standardize strings (uppercase city/state names)
  • Filter invalid records
  • Coalesce nulls in dimension attributes
Model Source Table Key Columns
stg_orders raw.orders order_id, customer_id, order_status, timestamps
stg_order_items raw.order_items order_id, product_id, seller_id, price, freight_value
stg_customers raw.customers customer_id, customer_unique_id, city, state
stg_products raw.products product_id, product_category_name, dimensions
stg_sellers raw.sellers seller_id, city, state
stg_payments raw.payments order_id, payment_type, payment_value
stg_reviews raw.reviews review_id, order_id, review_score
stg_geolocation raw.geolocation zip_code_prefix, latitude, longitude

Intermediate (int_*)

Materialized as ephemeral (CTEs inlined into downstream models). Business logic joins and enrichments:

  • int_order_enriched -- Join orders with items, payments, and reviews
  • int_customer_orders -- Aggregate order history per customer
  • int_seller_performance -- Seller-level delivery and revenue metrics

Marts (fct_* / dim_*)

Materialized as tables. Star-schema models optimized for dashboard queries:

  • fct_orders -- Grain: one row per order with total revenue, items, delivery times
  • fct_order_items -- Grain: one row per line item with denormalized product/seller attributes
  • dim_customers -- Customer dimension with geographic attributes and order history
  • dim_products -- Product dimension with category and physical attributes
  • dim_sellers -- Seller dimension with location and performance metrics
  • dim_dates -- Date dimension for time-series analysis

Testing

Python Tests (pytest)

# Run all ingestion tests make test # With coverage pytest ingestion/tests/ -v --cov=ingestion/src --cov-report=term-missing

dbt Tests

The project uses both generic and schema tests defined in _staging__models.yml:

Test Type Coverage
unique Primary keys (order_id, product_id, etc.)
not_null Required columns across all staging models
accepted_values order_status, payment_type, review_score
relationships Foreign keys (orders -> customers, items -> orders, etc.)
dbt_expectations Advanced data quality checks via the dbt_expectations package
# Run all dbt tests make dbt-test # Run tests for a specific model cd dbt_project && dbt test --select stg_orders

Linting

# Check style make lint # Auto-format make format

CI/CD

The project uses two GitHub Actions workflows:

dbt_ci.yml -- Pull Request Validation

Triggered on PRs that modify dbt_project/**.

Step Description
Checkout + Python setup Full git history for state comparison
dbt deps Install dbt_utils and dbt_expectations packages
dbt compile Validate SQL syntax and ref/source resolution
dbt build (modified+) Build only changed models + downstream dependencies
dbt test Run all generic and singular tests
Schema cleanup Drop the ephemeral CI_PR_<number> schema

Each PR gets an isolated Snowflake schema (CI_PR_42, CI_PR_87, etc.) so parallel PRs never conflict.

deploy.yml -- Production Deployment

Triggered on push to main.

lint-and-test ──> terraform-apply ──> dbt-deploy ──> deploy-dashboard 
Job Description
lint-and-test flake8 + black + pytest on ingestion code
terraform-plan Plan-only on PRs (for review)
terraform-apply Apply infrastructure changes (requires approval)
dbt-deploy dbt run + dbt test + dbt docs generate
deploy-dashboard Build and push the Streamlit Docker image

The terraform-apply, dbt-deploy, and deploy-dashboard jobs use the production GitHub environment, which can be configured with required reviewers and deployment protection rules.

Required GitHub Secrets

Secret Description
SNOWFLAKE_ACCOUNT Snowflake account identifier
SNOWFLAKE_USER Snowflake username
SNOWFLAKE_PASSWORD Snowflake password
SNOWFLAKE_ROLE Snowflake role (e.g., TRANSFORM_ROLE)
SNOWFLAKE_WAREHOUSE Snowflake warehouse name
SNOWFLAKE_DATABASE Snowflake database name
AWS_ACCESS_KEY_ID AWS IAM access key
AWS_SECRET_ACCESS_KEY AWS IAM secret key
AWS_REGION AWS region (e.g., us-east-1)
DOCKER_REGISTRY Container registry URL
DOCKER_USERNAME Registry username
DOCKER_PASSWORD Registry password or token

Monitoring

Layer Mechanism Details
Ingestion PySpark row-count logging Input vs. output row counts per table
Warehouse Snowflake source freshness warn_after: 24h, error_after: 48h
dbt Schema tests + dbt_expectations Uniqueness, not-null, accepted values, ranges
Airflow DAG-level email alerts On task failure and SLA miss
Dashboard Streamlit health check Docker HEALTHCHECK on /_stcore/health
CI/CD GitHub Actions status checks Required to pass before merge

Contributing

Contributions are welcome. Please follow these guidelines:

  1. Fork the repository and create a feature branch from main.
  2. Write tests for any new ingestion logic or dbt models.
  3. Follow the style guide:
    • Python: Black formatter (line length 120), flake8 linting
    • SQL: Lowercase keywords, trailing commas, CTEs over subqueries
    • Commits: Conventional Commits format (feat:, fix:, docs:, etc.)
  4. Run the checks locally before pushing:
    make lint make test make dbt-build
  5. Open a Pull Request against main with a clear description of changes.
  6. CI must pass before the PR can be merged.

Development Setup

# Install all dependencies (including dev tools) make setup # Run the full lint + test suite make lint && make test

License

This project is licensed under the MIT License.

MIT License Copyright (c) 2024 DataFlow Pro Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 

Acknowledgments

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors