FlowWatch is a tiny ergonomic layer on top of watchfiles that makes it easy to build file-driven workflows using simple decorators and a pretty Rich + Typer powered CLI.
Instead of wiring watchfiles.watch() manually in every project, you declare:
- what folder(s) you want to watch
- which patterns you care about (e.g.
*.mxf,*.json) - which function should run for a given event (created / modified / deleted)
FlowWatch takes care of:
- subscribing to all roots in a single watcher loop
- debouncing and recursive watching
- dispatching events to handlers with a small thread pool
- optional processing of existing files on startup
- nicely formatted logs and a CLI overview of registered handlers
- real-time web dashboard for monitoring events
FlowWatch is published as a normal Python package.
# Using uv (recommended) uv add flowwatch # Or with pip pip install flowwatch# Standalone dashboard (Starlette + uvicorn) uv add flowwatch --extra dashboard pip install flowwatch[dashboard] # FastAPI integration (mount in your FastAPI app) uv add flowwatch --extra fastapi pip install flowwatch[fastapi] # All features uv add flowwatch --extra all pip install flowwatch[all]from pathlib import Path from flowwatch import FileEvent, on_created, run WATCH_DIR = Path("inbox") WATCH_DIR.mkdir(exist_ok=True) @on_created(str(WATCH_DIR), pattern="*.txt", process_existing=True) def handle_new_text(event: FileEvent) -> None: print(f"New text file: {event.path}") if __name__ == "__main__": run() # blocks until Ctrl+CDrop *.txt files into inbox/ and watch the handler fire.
See the examples/ directory for more complete examples:
basic.py- Simple sync handlersasync_handlers.py- Mixed sync and async handlersdashboard.py- Standalone web dashboardfastapi_integration.py- Mount dashboard in FastAPI apps
Handlers receive a FileEvent object describing what happened:
| Attribute | Description |
|---|---|
event.change | watchfiles.Change (added, modified, deleted) |
event.path | pathlib.Path pointing to the file |
event.root | The root folder you registered |
event.pattern | The glob pattern that matched (if any) |
Convenience properties:
event.is_createdevent.is_modifiedevent.is_deleted
Register handlers using decorators from flowwatch:
@on_created(root, pattern="*.txt", process_existing=True) @on_modified(root, pattern="*.json") @on_deleted(root, pattern="*.bak") @on_any(root, pattern="*.*") # all eventsBehind the scenes these attach to a global FlowWatchApp instance, which you can run using flowwatch.run() or via the CLI.
FlowWatch natively supports both sync and async handlers. Async handlers are automatically detected and executed in a dedicated event loop thread:
import aiohttp from flowwatch import FileEvent, on_created, run WATCH_DIR = "./inbox" # Sync handler - runs in thread pool @on_created(WATCH_DIR, pattern="*.txt") def handle_sync(event: FileEvent) -> None: print(f"Sync: {event.path}") # Async handler - runs in async event loop @on_created(WATCH_DIR, pattern="*.json") async def handle_async(event: FileEvent) -> None: async with aiohttp.ClientSession() as session: await session.post("https://api.example.com/webhook", json={ "file": str(event.path), "event": event.change.name, }) if __name__ == "__main__": run()Async handlers are ideal for:
- HTTP/API calls (using
aiohttp,httpx) - Database operations (using
asyncpg,motor) - Any I/O-bound work that benefits from
async/await
Note: The dashboard requires optional dependencies. Install with
uv add flowwatch --extra dashboard
FlowWatch includes a real-time web dashboard for monitoring file events.
Features:
- Live event streaming via Server-Sent Events (SSE)
- Event statistics (created, modified, deleted counts)
- Watched directories overview
- File preview — click any event to view file contents with syntax highlighting
- Health check endpoint for container orchestration (
/health)
Click on any event row to expand it and see the file contents with syntax highlighting:
From Python:
from flowwatch import run_with_dashboard # ... define your handlers ... if __name__ == "__main__": run_with_dashboard(port=8765, open_browser=True)From CLI:
flowwatch run my_handlers.py --dashboard --dashboard-port 8765The dashboard exposes a health endpoint for monitoring:
curl http://localhost:8765/health{ "status": "healthy", "uptime_seconds": 123.45, "handlers_count": 5, "roots_count": 2, "events_processed": 42 }Mount the FlowWatch dashboard in your existing FastAPI application:
from contextlib import asynccontextmanager from fastapi import FastAPI from flowwatch import FlowWatchApp, create_dashboard_routes, on_created flowwatch = FlowWatchApp() @on_created("./watch_dir", pattern="*.txt", app=flowwatch) def handle_file(event): print(f"New file: {event.path}") @asynccontextmanager async def lifespan(app: FastAPI): flowwatch.start() # Start watching in background yield flowwatch.stop() # Graceful shutdown app = FastAPI(lifespan=lifespan) # Mount dashboard at /flowwatch/ app.include_router( create_dashboard_routes(flowwatch), prefix="/flowwatch", )Run with:
uv run fastapi run your_app.pyDashboard available at http://localhost:8000/flowwatch/
FlowWatch ships with a Typer + Rich powered CLI.
flowwatch run myproject.watchersOr run a Python file directly:
flowwatch run ./my_handlers.pyThe CLI will:
- Import your handlers module
- Show a Rich table with handlers, roots, events, patterns, and priorities
- Start the watcher loop with pretty logs
flowwatch run myproject.watchers \ --debounce 2.0 \ # Debounce interval in seconds (default: 1.6) --max-workers 8 \ # Thread pool size (default: 4) --no-recursive \ # Don't watch subdirectories --log-level DEBUG \ # Log level: DEBUG, INFO, WARNING, ERROR --json-logs \ # JSON-formatted logs for production --dashboard \ # Open web dashboard --dashboard-port 8080 # Dashboard port (default: 8765)For production environments and log aggregation systems (ELK, Datadog, CloudWatch):
flowwatch run myproject.watchers --json-logsOutput:
{ "timestamp": "2026-01-11T10:30:45.123456+00:00", "level": "INFO", "logger": "flowwatch", "message": "FlowWatch starting on roots: /data/inbox" }For more control, instantiate your own FlowWatchApp:
from pathlib import Path from watchfiles import Change from flowwatch import FileEvent, FlowWatchApp app = FlowWatchApp( name="my-custom-app", debounce=0.7, max_workers=8, json_logs=True, # Enable structured JSON logging ) def handle_any(event: FileEvent) -> None: print(event.change, event.path) app.add_handler( handle_any, root=Path("data"), events=[Change.added, Change.modified, Change.deleted], pattern="*.*", process_existing=True, ) app.run()A common pattern is to run FlowWatch as its own worker container:
# docker-compose.yml services: backend: build: ./backend volumes: - media:/media flowwatch: build: ./backend command: flowwatch run myproject.watchers --json-logs depends_on: - backend volumes: - media:/media restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8765/health"] interval: 30s timeout: 10s retries: 3 volumes: media:| Decorator | Description |
|---|---|
@on_created | Triggers on new files |
@on_modified | Triggers when files are changed |
@on_deleted | Triggers when files are removed |
@on_any | Triggers on any file event |
All decorators accept:
root: Directory to watch (string or Path)pattern: Glob pattern (e.g.,"*.txt","**/*.json")process_existing: Process existing files on startup (default:False)priority: Handler priority, higher runs first (default:0)
| Function | Description |
|---|---|
run() | Start the default FlowWatchApp |
run_with_dashboard() | Start with standalone web dashboard |
stop_dashboard() | Stop the standalone dashboard server |
create_dashboard_routes() | Create FastAPI router for dashboard |
| Class | Description |
|---|---|
FlowWatchApp | Main application for custom configurations |
FileEvent | Event object passed to handlers |
JsonFormatter | Logging formatter for structured JSON output |
| Method | Description |
|---|---|
add_handler() | Register a handler function |
run() | Start watching (blocking) |
start() | Start watching in background thread (non-blocking) |
stop() | Stop background watcher gracefully |
is_running | Property: check if watcher is running |
For type-annotating your handlers:
| Alias | Description |
|---|---|
SyncHandler | Callable[[FileEvent], None] |
AsyncHandler | Callable[[FileEvent], Coroutine[Any, Any, None]] |
Handler | `SyncHandler |
FlowWatch is a good fit when you want:
- Simple file pipelines like:
- "When a new MXF appears here, run this ingester."
- "When a JSON config changes, reload some state."
- "When a sidecar file is deleted, clean up something else."
- Readable, declarative code where intent is obvious from decorators
- Pretty terminal UX when running workers in Docker or bare metal
- Real-time monitoring via the web dashboard
It is not trying to be a full-blown workflow engine. Think of it as a thin, Pythonic glue layer over watchfiles.
# Clone and install dev dependencies git clone https://github.com/MichielMe/flowwatch.git cd flowwatch uv sync --all-extras # Run tests uv run pytest # Run with coverage uv run pytest --cov=flowwatch --cov-report=term-missing # Lint and type check uv run ruff check src/ uv run mypy src/See CONTRIBUTING.md for detailed development guidelines, code style, and pull request process.
MIT License - see LICENSE for details.

