AI Pipeline Orchestration Library for Elixir
A robust, production-ready library for chaining AI providers (Claude and Gemini) with advanced features like fault tolerance, session management, and self-improving Genesis pipelines.
π― Library Readiness: 8.5/10 - Ready for immediate use as a Git dependency with comprehensive testing, clean API, and flexible configuration.
π Remaining 1.5/10 for Full Production Readiness
Missing Features (1.5/10):
- Hex Package Publication (0.5/10) - Currently Git-only, needs
mix hex.publishworkflow - Enhanced Documentation (0.3/10) - ExDoc polish, API reference examples, getting started guide
- Backward Compatibility (0.2/10) - Semantic versioning strategy, deprecation warnings, migration guides
- Performance Benchmarks (0.2/10) - Baseline metrics, memory profiling, concurrency benchmarks
- Production Hardening (0.3/10) - Rate limiting, circuit breakers, structured logging with correlation IDs
Current Status: 8.5/10 = Excellent for Git dependency | 10/10 = Enterprise-ready Hex package
NEW: Our flagship feature - a pipeline that generates pipelines! The Genesis Pipeline is an AI system that creates other AI pipelines, enabling true self-improvement and evolution.
# Generate a new AI pipeline with one command mix pipeline.generate.live "Create a sentiment analysis pipeline" # The system will create a complete, executable pipeline in evolved_pipelines/ # Run your generated pipeline immediately: mix pipeline.run evolved_pipelines/sentiment_analyzer_*.yamlWhat just happened? The Genesis Pipeline used Claude to analyze your request, design the optimal pipeline structure, and generate a complete YAML configuration that's immediately ready to execute.
Use pipeline_ex as a dependency in your Elixir applications:
# mix.exs defp deps do [ # From Hex.pm (recommended) {:pipeline_ex, "~> 0.1.1"} # Or from GitHub # {:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git", tag: "v0.1.1"} ] end
β οΈ Breaking Change in v0.1.0 Async streaming system removed. ClaudeCodeSDK already streams messages optimally - the custom buffering system added unnecessary complexity. Seedocs/ASYNC_STREAMING_DEPRECATION.mdfor details.
# Load and execute a pipeline {:ok, config} = Pipeline.load_workflow("my_analysis.yaml") {:ok, results} = Pipeline.execute(config) # Execute with custom configuration {:ok, results} = Pipeline.execute(config, workspace_dir: "/app/ai_workspace", output_dir: "/app/pipeline_outputs" ) # Convenience function {:ok, results} = Pipeline.run("my_analysis.yaml", debug: true) # Health check case Pipeline.health_check() do :ok -> IO.puts("Pipeline system ready") {:error, issues} -> IO.puts("Issues: #{inspect(issues)}") endThe library supports flexible configuration through multiple sources:
-
Function options (highest priority):
Pipeline.execute(config, workspace_dir: "/custom/workspace")
-
Environment variables:
export PIPELINE_WORKSPACE_DIR="/app/workspace" export PIPELINE_OUTPUT_DIR="/app/outputs" export PIPELINE_CHECKPOINT_DIR="/app/checkpoints"
-
YAML configuration and defaults
# Phoenix controller defmodule MyAppWeb.AIController do def analyze(conn, %{"code" => code}) do case Pipeline.run("pipelines/code_analysis.yaml", workspace_dir: "/tmp/ai_workspace") do {:ok, %{"analysis" => result}} -> json(conn, %{analysis: result}) {:error, reason} -> put_status(conn, 500) |> json(%{error: reason}) end end end # Background job with Oban defmodule MyApp.AnalysisWorker do use Oban.Worker, queue: :ai_analysis def perform(%Oban.Job{args: %{"project_id" => project_id}}) do case Pipeline.execute(get_analysis_config(), workspace_dir: "/tmp/analysis_#{project_id}", output_dir: "/app/results/#{project_id}") do {:ok, results} -> MyApp.Projects.update_analysis(project, results) :ok {:error, reason} -> {:error, reason} end end end# Enable mock mode for development/testing Application.put_env(:pipeline, :test_mode, true) # All AI calls will be mocked {:ok, results} = Pipeline.execute(config)π Complete Library Guide: See the documentation sections below for detailed usage instructions, configuration options, and integration patterns.
π Advanced Features: See ADVANCED_FEATURES.md for comprehensive documentation on loops, complex conditions, file operations, data transformation, codebase intelligence, and state management.
π YAML Format v2 Documentation: See docs/20250704_yaml_format_v2/index.md for complete reference documentation on the Pipeline YAML format, including all step types, prompt systems, control flow, and best practices.
- ποΈ Elixir Library: Use as a dependency in any Elixir application
- π§ Clean API: Simple
Pipeline.execute/2andPipeline.load_workflow/1functions - βοΈ Configurable: All paths and settings customizable via options/environment variables
- π§ͺ Mock Mode: Complete testing support without API costs
- π₯ Health Checks: Built-in system validation and monitoring
- π€ Multi-AI Integration: Chain Claude and Gemini APIs together
- π° Model Selection & Cost Control: Choose between Claude models (sonnet ~$0.01 vs opus ~$0.26 per query)
- π Flexible Execution Modes: Mock, Live, and Mixed modes for testing
- π YAML Workflow Configuration: Define complex multi-step workflows
- π― Structured Output: JSON-based responses with proper error handling
- π§ InstructorLite Integration: Structured generation with Gemini
- π Result Management: Organized output storage and display
- Enhanced Claude Steps: Smart presets, sessions, extraction, batch processing, robust error handling
- Model Selection: Automatic cost optimization (development=sonnet, production=opus+fallback, analysis=opus)
- Genesis Pipeline: Self-improving AI system that generates other pipelines
- Session Management: Persistent conversations with automatic checkpointing
- Fault Tolerance: Retry mechanisms, circuit breakers, graceful degradation
- Loop Constructs: For/while loops with parallel execution and nested support
- Complex Conditions: Boolean logic, comparisons, mathematical expressions
- File Operations: Copy, move, validate, convert with format transformations
- Data Transformation: Filter, aggregate, join with schema validation
- Codebase Intelligence: Project discovery, code analysis, dependency mapping
- State Management: Variables, interpolation, checkpoints with persistence
- Async Streaming: Real-time response streaming for Claude steps with multiple handlers
π See ADVANCED_FEATURES.md for detailed documentation and examples of all advanced capabilities.
# mix.exs defp deps do [ # From Hex.pm (recommended) {:pipeline_ex, "~> 0.0.1"} # Or from GitHub # {:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git", tag: "v0.0.1"} ] endgit clone <repository> cd pipeline_ex mix deps.getπ Start with one of these commands:
# Mock mode (safe, free, fast) mix showcase # Complete demo with mocks # Live mode (real API calls, costs money) mix showcase --live # Complete demo with live APIsAll examples and tests can run in two modes:
| Mode | Command Format | API Calls | Costs | Authentication Required |
|---|---|---|---|---|
| Mock | mix showcase | None (mocked) | $0.00 | No |
| Live | mix showcase --live | Real API calls | Real costs | Yes (claude login) |
mix test # All tests with mocked APIs (fast, free)# Setup authentication first: export GEMINI_API_KEY="your_api_key" claude login # Run tests with real APIs: mix pipeline.test.live # Only integration tests with live APIs (costs money)The pipeline supports three execution modes controlled by the TEST_MODE environment variable:
| Mode | Description | Use Case |
|---|---|---|
mock | Uses fake responses | Development, unit testing, CI/CD |
live | Uses real API calls | Production, integration testing |
mixed | Mocks for unit tests, live for integration | Hybrid testing approach |
# Mock mode - fast, no API costs mix pipeline.run examples/comprehensive_config_example.yaml # Live mode - real AI responses mix pipeline.run.live examples/comprehensive_config_example.yaml # Mixed mode - context-dependent TEST_MODE=mixed mix testClaude uses the authenticated CLI. Run once:
claude loginSet your API key:
export GEMINI_API_KEY="your_gemini_api_key_here"Or in your application config:
config :pipeline, gemini_api_key: "your_api_key"Create YAML workflow files like test_simple_workflow.yaml:
workflow: name: "simple_test_workflow" description: "Test basic claude functionality" steps: - name: "analyze_code" type: "claude" prompt: - type: "static" content: | Analyze this simple Python function and provide feedback: def add(a, b): return a + b Please provide your analysis in JSON format. - name: "plan_improvements" type: "gemini" prompt: - type: "static" content: | Based on the previous analysis, create a plan to improve the function. Consider error handling, type hints, and documentation.defmodule MyApp.AIProcessor do @doc "Analyze code using the pipeline library" def analyze_code(code_content) do # Load your pipeline configuration case Pipeline.load_workflow("pipelines/code_analysis.yaml") do {:ok, config} -> # Execute with custom workspace Pipeline.execute(config, workspace_dir: "/tmp/ai_workspace", debug: true ) {:error, reason} -> {:error, "Failed to load workflow: #{reason}"} end end @doc "Health check for the AI system" def system_ready? do case Pipeline.health_check() do :ok -> true {:error, _issues} -> false end end end # Usage in your application {:ok, analysis} = MyApp.AIProcessor.analyze_code(user_code) IO.inspect(analysis["analysis_step"])#!/usr/bin/env elixir Mix.install([ {:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git"} ]) # Load and execute pipeline case Pipeline.run("test_simple_workflow.yaml", output_dir: "outputs") do {:ok, results} -> IO.puts("β
Pipeline completed!") IO.inspect(results) {:error, reason} -> IO.puts("β Pipeline failed: #{reason}") end# Test with mock responses (fast) TEST_MODE=mock elixir run_example.exs # Test with real Claude + mock Gemini # (useful when you have Claude access but no Gemini API key) TEST_MODE=mixed elixir -e " System.put_env(\"FORCE_MOCK_GEMINI\", \"true\") Code.compile_file(\"run_example.exs\") " # Full live test (requires both API keys) elixir run_example.exsThe project includes a comprehensive configuration example that demonstrates all available features with minimal steps. This example showcases every configuration option, step type, and feature available in the pipeline system.
# Run the comprehensive example with mocked AI responses mix pipeline.run examples/comprehensive_config_example.yaml # Run with debug output to see detailed execution PIPELINE_DEBUG=true mix pipeline.run examples/comprehensive_config_example.yamlTo run the comprehensive example with actual AI providers:
# Set your Gemini API key (get from https://aistudio.google.com/) export GEMINI_API_KEY="your_gemini_api_key_here" # Authenticate Claude CLI (pre-authenticated, no API key needed) claude auth# Run with real AI providers mix pipeline.run.live examples/comprehensive_config_example.yaml # Run with full debug logging PIPELINE_DEBUG=true PIPELINE_LOG_LEVEL=debug mix pipeline.run.live examples/comprehensive_config_example.yamlThe examples/comprehensive_config_example.yaml shows:
- β
Basic step types:
gemini,claude,parallel_claude,gemini_instructor - β
Enhanced Claude steps:
claude_smart,claude_session,claude_extract,claude_batch,claude_robust - β Function calling: Gemini with structured function definitions
- β All Claude tools: Write, Edit, Read, Bash, Search, Glob, Grep
- β Parallel execution: Multiple Claude instances running simultaneously
- β Conditional steps: Steps that run based on previous results
- β All prompt types: Static content, file content, previous responses
- β Workspace management: Sandboxed file operations
- β Token budgets: Fine-tuned AI response configurations
- β Model selection: Different AI models for different tasks
- β Output management: Structured result saving and organization
The pipeline includes five advanced Claude step types that extend the basic claude step with specialized capabilities:
Intelligent preset-based configuration with environment awareness.
- Presets:
development,production,analysis,chat,test - Auto-optimization: Preset-specific tool restrictions and performance tuning
- Environment detection: Automatic preset selection based on Mix environment
- name: "smart_analysis" type: "claude_smart" preset: "analysis" # Uses analysis-optimized settings prompt: [...]Persistent conversation management with session state.
- Session persistence: Continue conversations across multiple steps
- Automatic checkpointing: Save session state for recovery
- Turn management: Configurable conversation length limits
- name: "session_start" type: "claude_session" session_name: "math_tutor" session_config: persist: true max_turns: 50Advanced content extraction and post-processing.
- Output formats:
json,markdown,structured,summary - Post-processing: Extract code blocks, recommendations, links, key points
- Content filtering: Apply extraction rules and transformations
- name: "extract_json" type: "claude_extract" preset: "analysis" extraction_config: format: "json" post_processing: ["extract_code_blocks", "extract_recommendations"] include_metadata: trueParallel task execution with load balancing.
- Concurrent processing: Run multiple Claude queries simultaneously
- Task management: Queue and execute independent tasks
- Performance scaling: Configurable parallelism limits
- name: "batch_analysis" type: "claude_batch" batch_config: max_parallel: 3 tasks: - id: "task1" prompt: "Analyze JavaScript code..." - id: "task2" prompt: "Analyze Python code..."Enterprise-grade error recovery and fault tolerance.
- Retry mechanisms: Configurable backoff strategies
- Fallback actions: Graceful degradation options
- Circuit breaker: Prevent cascade failures
- name: "robust_analysis" type: "claude_robust" retry_config: max_retries: 3 backoff_strategy: "exponential" fallback_action: "simplified_prompt"Each enhanced step type has dedicated example files for testing:
# Test individual enhanced step types mix pipeline.run.live examples/claude_smart_example.yaml mix pipeline.run.live examples/claude_session_example.yaml mix pipeline.run.live examples/claude_extract_example.yaml mix pipeline.run.live examples/claude_batch_example.yaml mix pipeline.run.live examples/claude_robust_example.yaml # Or run all enhanced examples in mock mode (free) mix pipeline.run examples/claude_smart_example.yaml mix pipeline.run examples/claude_session_example.yaml # ... etcThe pipeline now supports real-time message streaming for all Claude-based steps, displaying complete messages as they arrive from ClaudeCodeSDK for better user experience.
- Real-time feedback: See Claude's complete messages as they arrive (message-by-message)
- Lower memory usage: Process messages without buffering entire responses
- Better UX: Progressive display of assistant responses, tool uses, and results
- Visibility: Watch Claude's tool usage and thinking process in real-time
- name: "streaming_claude" type: "claude" claude_options: # Enable async streaming async_streaming: true stream_handler: "console" # Real-time console output stream_buffer_size: 100 # Message buffer size # Standard options max_turns: 10 allowed_tools: ["Write", "Read", "Edit"]-
Console Handler (
console) - Real-time terminal outputstream_handler: "console" stream_console_config: show_timestamps: true color_output: true show_progress: true
-
File Handler (
file) - Stream to file with rotationstream_handler: "file" stream_file_path: "./outputs/stream.jsonl" stream_file_rotation: enabled: true max_size_mb: 10 max_files: 5
-
Buffer Handler (
buffer) - Collect in memory with statsstream_handler: "buffer" stream_buffer_config: max_size: 1000 circular: true deduplication: true
-
Callback Handler (
callback) - Custom processingstream_handler: "callback" stream_callback_config: filter_types: ["text", "tool_use"] rate_limit: 10
# Simple streaming example mix pipeline.run examples/clean_streaming_numbers.yaml # Multi-message streaming with file operations mix pipeline.run examples/streaming_file_operations.yaml # Run streaming tests mix test test/integration/async_streaming_test.exsπ Complete Streaming Guide: See examples/STREAMING_GUIDE.md for implementation details and docs/ASYNC_STREAMING_MIGRATION_GUIDE.md for adding streaming to existing pipelines.
Async streaming is supported by all Claude-based steps:
claude- Basic Claude stepclaude_smart- With presetsclaude_session- With session continuityclaude_extract- With extractionclaude_batch- Parallel streamingclaude_robust- With error recoveryparallel_claude- Multiple streams
For advanced configuration, you can set these environment variables:
# API Configuration export GEMINI_API_KEY="your_gemini_api_key" # Note: Claude uses CLI authentication (claude auth), no API key needed # Pipeline Directories export PIPELINE_WORKSPACE_DIR="./workspace" # Claude's sandbox export PIPELINE_OUTPUT_DIR="./outputs" # Result storage export PIPELINE_CHECKPOINT_DIR="./checkpoints" # State management # Logging and Debug export PIPELINE_LOG_LEVEL="debug" # debug, info, warn, error export PIPELINE_DEBUG="true" # Detailed execution logs # Execution Mode export TEST_MODE="live" # live, mock, mixed- Start with the example: Copy
examples/comprehensive_config_example.yaml - Read the guides:
- Pipeline Configuration Guide for basic configuration
- Prompt System Guide for advanced prompt management, templates, and reusable components
- Recursive Pipelines Guide for pipeline composition and modular workflows
- ADVANCED_FEATURES.md for loops, conditions, file operations, and more
- TESTING_ARCHITECTURE.md for comprehensive testing approaches
- Test in mock mode: Validate your workflow logic without API costs
- Run live: Execute with real AI providers when ready
The run_example.exs script demonstrates the pipeline:
# Quick test with mocks TEST_MODE=mock elixir run_example.exs # Full test with APIs elixir run_example.exstest/ βββ unit/ # Fast unit tests (mocked) βββ integration/ # Integration tests (live APIs) βββ fixtures/ # Test data and workflows βββ support/ # Test helpers defmodule MyTest do use ExUnit.Case use Pipeline.TestCase # Provides test mode helpers test "my feature works in mock mode" do # Test automatically uses mocks assert Pipeline.execute_something() == expected_result end endEnable debug output:
# See detailed execution logs DEBUG=true elixir run_example.exs # See API request/response details VERBOSE=true elixir run_example.exsDebug output includes:
- Step execution flow
- API request/response details
- Provider selection (mock vs live)
- Result processing
# Error: Claude CLI not found or not authenticated claude login# Error: GEMINI_API_KEY environment variable not set export GEMINI_API_KEY="your_key_here"# Check which providers are mocked vs live TEST_MODE=mock elixir run_example.exs # All mocked TEST_MODE=live elixir run_example.exs # All liveTODO: Add license information