Warning
Work in progress! Pre-alpha, expect breaking changes.
Define collections — get automatic Jetstream ingestion, PDS backfill, user discovery, and typed XRPC endpoints. Runs on Cloudflare Workers + D1.
pnpm install # Edit src/config.ts with your collections pnpm generate:pull # pull lexicons from network, auto-detect fields, generate types pnpm dev:auto # start wrangler dev with auto-ingestion, leave running while you sync pnpm sync # in a different terminal, discover users + backfill records from PDSnpx wrangler d1 create contrail # Add database_id to wrangler.toml pnpm deploy # to sync in production, run it locally but set your d1 to remote, then run pnpm syncIngestion runs automatically via cron (*/1 * * * *). Schema is auto-initialized.
Edit src/config.ts — this is the only file you need to touch:
export const config: ContrailConfig = { namespace: "com.example", // your reverse-domain namespace collections: { "community.lexicon.calendar.event": { relations: { rsvps: { collection: "community.lexicon.calendar.rsvp", groupBy: "status", // materialized counts by status }, }, }, "community.lexicon.calendar.rsvp": {}, }, // profiles: ["app.bsky.actor.profile"], ← default // jetstreams: [...] ← default: 4 Bluesky jetstream endpoints // relays: [...] ← default: 2 Bluesky relay endpoints };When you run pnpm generate, queryable fields are derived from each collection's lexicon:
- String fields → equality filter (
?status=going) - Datetime/integer fields → range filters (
?startsAtMin=2026-03-16&startsAtMax=2026-04-01) - StrongRef fields →
.uriequality filter (?subjectUri=at://...)
You can override any auto-detected field by specifying queryable manually in config.
| Option | Default | Description |
|---|---|---|
queryable | auto-detected | Override auto-detected queryable fields |
discover | true | Find users via relays. false = only track known DIDs |
relations | {} | Many-to-one relationships with materialized counts |
relations.*.field | "subject.uri" | Field in the related record to match against |
relations.*.match | "uri" | Match against parent's "uri" or "did" |
relations.*.groupBy | — | Split counts by this field's value |
queries | {} | Custom query handlers |
searchable | auto-detected | FTS5 search fields. string[] = explicit fields, false = disabled, omitted = all non-range queryable fields |
profiles is a top-level config array of collection NSIDs that contain profile records (rkey self). Defaults to ["app.bsky.actor.profile"]. These are auto-added to collections with { discover: false }. Use ?profiles=true on any endpoint to include a profiles map in the response, keyed by DID, with handle and profile record data.
All endpoints at /xrpc/{nsid}.{method}:
| Endpoint | Description |
|---|---|
{collection}.listRecords | List/filter records |
{collection}.getRecord | Get single record by URI |
{namespace}.getProfile | Get a user's profile by DID or handle |
{namespace}.notifyOfUpdate | Notify of a record change for immediate indexing |
{namespace}.admin.sync | Discover + backfill (requires ADMIN_SECRET) |
{namespace}.admin.getCursor | Current cursor position |
{namespace}.admin.getOverview | All collections summary |
{namespace}.admin.reset | Delete all data (requires ADMIN_SECRET) |
Filtering:
| Param | Example | Description |
|---|---|---|
actor | ?actor=did:plc:... or ?actor=alice.bsky.social | Filter by DID or handle (triggers on-demand backfill) |
profiles | ?profiles=true | Include profile + identity info keyed by DID |
search | ?search=meetup | Full-text search across searchable fields (FTS5, ranked) |
{field} | ?status=going | Equality filter on queryable string field |
{field}Min | ?startsAtMin=2026-03-16 | Range minimum (datetime/integer fields) |
{field}Max | ?endsAtMax=2026-04-01 | Range maximum (datetime/integer fields) |
{rel}CountMin | ?rsvpsCountMin=10 | Minimum total relation count |
{rel}{Group}CountMin | ?rsvpsGoingCountMin=10 | Minimum relation count for a specific groupBy value |
hydrate{Rel} | ?hydrateRsvps=10 | Embed latest N related records (per group if grouped) |
hydrate{Ref} | ?hydrateEvent=true | Embed the referenced record |
sort | ?sort=startsAt | Sort by a queryable field or count (see below) |
order | ?order=asc | Sort direction: asc or desc (default depends on field type) |
limit | ?limit=25 | Page size (1-100, default 50) |
cursor | ?cursor=... | Pagination cursor |
Sorting — sort accepts any queryable field param name or a count field:
?sort=startsAt # by date (default: desc for range fields) ?sort=name&order=asc # by name ascending ?sort=rsvpsCount # by total RSVP count (default: desc) ?sort=rsvpsGoingCount&order=asc # by going count ascending Search uses SQLite FTS5 for ranked full-text search. By default, all non-range queryable fields are searchable. Results are ranked by relevance (BM25) with time_us as tiebreaker. Supports FTS5 syntax including prefix (meetup*), phrases ("rust meetup"), and boolean (rust OR typescript). Combinable with all other filters.
?search=meetup # basic search ?search=meetup&mode=online # search + filter ?search=rust*&sort=startsAt&order=asc # search + sort override Hydration embeds related or referenced records inline:
?hydrateRsvps=5 # latest 5 RSVPs per group (going, interested, etc.) ?hydrateEvent=true # embed the referenced event record ?hydrateRsvps=5&hydrateEvent=true # combine both # Upcoming events with 10+ going RSVPs, with RSVP records and profiles /xrpc/community.lexicon.calendar.event.listRecords?startsAtMin=2026-03-16&rsvpsGoingCountMin=10&hydrateRsvps=5&profiles=true # Events for a specific user (by handle) /xrpc/community.lexicon.calendar.event.listRecords?actor=alice.bsky.social&profiles=true # Single event with counts, RSVPs, and profiles /xrpc/community.lexicon.calendar.event.getRecord?uri=at://did:plc:.../community.lexicon.calendar.event/...&hydrateRsvps=10&profiles=true # Search for events by name/description /xrpc/community.lexicon.calendar.event.listRecords?search=meetup&profiles=true # RSVPs for a specific event, with the referenced event embedded /xrpc/community.lexicon.calendar.rsvp.listRecords?subjectUri=at://did:plc:.../community.lexicon.calendar.event/...&hydrateEvent=true&profiles=true By default, Contrail ingests from Jetstream every minute. If your app writes to a user's PDS and needs the change reflected immediately, call notifyOfUpdate right after the write:
// User creates an RSVP via their PDS const { uri } = await agent.createRecord({ ... }); // Tell Contrail to fetch and index it now await fetch("https://your-contrail.workers.dev/xrpc/com.example.notifyOfUpdate", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ uri }), });Contrail fetches the record from the user's PDS and figures out what to do:
| PDS returns | Already indexed? | Action |
|---|---|---|
| Record (new CID) | No | Create — indexes it, updates relation counts |
| Record (new CID) | Yes | Update — upserts the record |
| Record (same CID) | Yes | Skip — nothing changed |
| 404 | Yes | Delete — removes it, decrements counts |
| 404 | No | No-op |
You can also batch up to 25 URIs in one request:
await fetch(".../xrpc/com.example.notifyOfUpdate", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ uris: [uri1, uri2, uri3] }), });When Jetstream later delivers the same event, the duplicate is detected by CID and skipped.
You can get fully typed XRPC queries for any Contrail instance using @atcute/lex-cli. The lexicon files are committed to the repo, so you can pull them directly via the git source.
npm install @atcute/client @atcute/lexicons @atcute/lex-cliCreate a lex.config.js pointing at the Contrail instance's repo:
import { defineLexiconConfig } from "@atcute/lex-cli"; export default defineLexiconConfig({ outdir: "src/lexicon-types/", imports: ["@atcute/atproto"], files: ["lexicons/**/*.json"], pull: { outdir: "lexicons/", sources: [ { type: "git", remote: "https://github.com/USER/REPO.git", // the Contrail instance repo pattern: ["lexicons-generated/**/*.json", "lexicons-pulled/**/*.json", "lexicons/**/*.json"], }, ], }, });Then pull and generate:
npx lex-cli pull && npx lex-cli generateImport the generated types (side-effect import registers them with @atcute/client), then query with full type safety:
import "./lexicon-types/index.js"; // registers ambient types import { Client } from "@atcute/client"; const rpc = new Client({ handler: simpleFetchHandler({ service: /* your contrail url */ }) }); const response = await rpc.get("community.lexicon.calendar.event.getRecords", { params: { status: "going", limit: 10 }, // typed params }); if (response.ok) { console.log(response.data.records); // typed }