alchemy-effect is in alpha and not ready for production use (expect breaking changes). Come hang in our Discord to participate in the early stages of development.
Infrastructure-as-Effects — unify your business logic and infrastructure into a single, type-safe Effect program.
bun add alchemy-effect effectA Stack is an Effect that declares Resources and returns outputs. Wire it up with Stack.make and provide cloud providers as Layers.
import { AWS, Stack } from "alchemy-effect"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; export default Effect.gen(function* () { const bucket = yield* AWS.S3.Bucket("MyBucket"); return { bucketArn: bucket.bucketArn, }; }).pipe(Stack.make("MyStack"), Effect.provide(AWS.providers()));Resources are declared inline as Effects. They produce typed Output Attributes that flow into other Resources.
const bucket = yield * AWS.S3.Bucket("DataBucket", { forceDestroy: true, }); const queue = yield * AWS.SQS.Queue("JobsQueue", { fifo: true, visibilityTimeout: 60, }); const table = yield * AWS.DynamoDB.Table("UsersTable", { tableName: "users", partitionKey: { name: "pk", type: "S" }, sortKey: { name: "sk", type: "S" }, });Output Attributes from one Resource can be passed as Input Properties to another — the engine resolves the dependency graph automatically.
A Lambda Function is a special Resource whose Effect body defines the runtime behavior. The returned object configures the function's infrastructure.
import * as Lambda from "alchemy-effect/AWS/Lambda"; import * as S3 from "alchemy-effect/AWS/S3"; import * as Http from "alchemy-effect/Http"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; export default Effect.gen(function* () { const bucket = yield* S3.Bucket("DataBucket"); // bind S3 capabilities to this function's runtime const getObject = yield* S3.GetObject.bind(bucket); const putObject = yield* S3.PutObject.bind(bucket); // register a HTTP server for the Lambda runtime yield* Http.serve(myHttpApp); return { main: import.meta.filename, memory: 1024, url: true, } as const; }).pipe( Effect.provide( Layer.mergeAll(Http.lambdaHttpServer, S3.GetObjectLive, S3.PutObjectLive), ), Lambda.Function("ApiFunction"), );Encapsulate Resources and capabilities into Effect Services for clean separation of concerns.
import * as S3 from "alchemy-effect/AWS/S3"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; import * as ServiceMap from "effect/ServiceMap"; export class JobStorage extends ServiceMap.Service< JobStorage, { bucket: S3.Bucket; putJob(job: Job): Effect.Effect<Job>; getJob(jobId: string): Effect.Effect<Job | undefined>; } >()("JobStorage") {} export const jobStorage = Layer.effect( JobStorage, Effect.gen(function* () { const bucket = yield* S3.Bucket("JobsBucket"); const getObject = yield* S3.GetObject.bind(bucket); const putObject = yield* S3.PutObject.bind(bucket); return JobStorage.of({ bucket, putJob: (job) => putObject({ Key: job.id, Body: JSON.stringify(job) }).pipe( Effect.map(() => job), Effect.orDie, ), getJob: (jobId) => getObject({ Key: jobId }).pipe( Effect.map((item) => item.Body as any), Effect.orDie, ), }); }), );Then provide it as a Layer to your Lambda Function:
export default Effect.gen(function* () { const { bucket, getJob } = yield* JobStorage; // ... return { main: import.meta.filename, url: true } as const; }).pipe(Effect.provide(jobStorage), Lambda.Function("JobFunction"));Subscribe to S3 notifications, SQS queues, and other event sources as Streams.
import * as Stream from "effect/Stream"; yield * S3.notifications(bucket).subscribe((stream) => stream.pipe( Stream.flatMap((item) => Stream.fromEffect(getJob(item.key))), Stream.tapSink(sink), Stream.runDrain, ), );Serve an Effect HttpApi directly from a Lambda Function. Define endpoints, implement handlers, build a Layer, and convert it to an Effect that Http.serve can register.
import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; import * as Schema from "effect/Schema"; import * as HttpRouter from "effect/unstable/http/HttpRouter"; import * as HttpServer from "effect/unstable/http/HttpServer"; import * as HttpApi from "effect/unstable/httpapi/HttpApi"; import * as HttpApiBuilder from "effect/unstable/httpapi/HttpApiBuilder"; import * as HttpApiEndpoint from "effect/unstable/httpapi/HttpApiEndpoint"; import * as HttpApiGroup from "effect/unstable/httpapi/HttpApiGroup"; // 1. Define endpoints const getJob = HttpApiEndpoint.get("getJob", "/", { success: Job, params: { jobId: JobId }, }); const createJob = HttpApiEndpoint.post("createJob", "/", { success: JobId, payload: Schema.Struct({ content: Schema.String }), }); const JobApi = HttpApi.make("JobApi").add( HttpApiGroup.make("Jobs").add(getJob, createJob), ); // 2. Implement handlers const JobApiHandlers = HttpApiBuilder.group(JobApi, "Jobs", (handlers) => handlers .handle( "getJob", Effect.fn(function* (req) { const storage = yield* JobStorage; return yield* storage.getJob(req.params.jobId); }), ) .handle( "createJob", Effect.fn(function* (req) { const storage = yield* JobStorage; const job = yield* storage.putJob({ id: "TODO", content: req.payload.content, }); return job.id; }), ), ); // 3. Build the API Layer and convert to an HttpEffect const JobApiLive = HttpApiBuilder.layer(JobApi).pipe( Layer.provide(JobApiHandlers), Layer.provide(HttpServer.layerServices), ); export const JobHttpEffect = HttpRouter.toHttpEffect(JobApiLive);Then serve it inside your Lambda Function:
yield * Http.serve(JobHttpEffect);Effect's RPC layer works the same way.
import { Rpc, RpcGroup, RpcServer } from "effect/unstable/rpc"; const getJob = Rpc.make("getJob", { success: Job, error: JobNotFound, payload: { jobId: JobId }, }); export class JobRpcs extends RpcGroup.make(getJob, createJob) {} export const JobRpcHttpEffect = RpcServer.toHttpEffect(JobRpcs).pipe( Effect.provide(JobRpcsLive), );Control what happens when a Resource is removed from your stack.
import { RemovalPolicy } from "alchemy-effect"; const queue = yield * SQS.Queue("JobsQueue").pipe(RemovalPolicy.retain(stage === "prod"));Configure AWS credentials and region per stage using Effect Layers and Config.
import { AWS, Stage } from "alchemy-effect"; import * as Config from "effect/Config"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; const awsConfig = Layer.effect( AWS.StageConfig, Effect.gen(function* () { const stage = yield* Stage; if (stage === "prod") { return AWS.StageConfig.of({ account: "123456789012", region: "us-west-2", }); } return AWS.StageConfig.of({ profile: "dev", account: "987654321098", region: yield* Config.string("AWS_REGION").pipe( Config.withDefault("us-west-2"), ), }); }), ); export default Effect.gen(function* () { // ... }).pipe( Stack.make("MyStack"), Effect.provide(Layer.provide(AWS.providers(), awsConfig)), );The sections below explain how to implement your own Resources, Resource Providers, and Bindings.
A Resource is defined by its Props (input) and Attributes (output). Use the Resource constructor to register the type.
import { Resource } from "alchemy-effect/Resource"; export interface StreamProps { streamName?: string; streamMode?: "PROVISIONED" | "ON_DEMAND"; shardCount?: number; retentionPeriodHours?: number; encryption?: boolean; kmsKeyId?: string; tags?: Record<string, string>; } export interface Stream extends Resource< "AWS.Kinesis.Stream", StreamProps, { streamName: string; streamArn: string; streamStatus: "CREATING" | "DELETING" | "ACTIVE" | "UPDATING"; } > {} export const Stream = Resource<Stream>("AWS.Kinesis.Stream");Users interact with the Resource as an Effect:
const stream = yield * Stream("MyStream", { streamMode: "ON_DEMAND", retentionPeriodHours: 48, }); yield * Console.log(stream.streamArn); // typed Output attributeA Resource Provider implements the lifecycle operations: create, update, delete, and optionally diff and read. It is registered via Resource.provider.effect(...).
export const StreamProvider = () => Stream.provider.effect( Effect.gen(function* () { const region = yield* Region; const accountId = yield* Account; return { // properties that are stable across updates (never change) stables: ["streamName", "streamArn"], // determine if a prop change requires replace vs update diff: Effect.fn(function* ({ id, news, olds }) { const oldName = olds.streamName ?? (yield* createPhysicalName({ id })); const newName = news.streamName ?? (yield* createPhysicalName({ id })); if (oldName !== newName) { return { action: "replace" } as const; } // returning void means "use default update logic" }), create: Effect.fn(function* ({ id, news, session }) { const streamName = news.streamName ?? (yield* createPhysicalName({ id, maxLength: 128 })); yield* kinesis .createStream({ StreamName: streamName, StreamModeDetails: { StreamMode: news.streamMode ?? "ON_DEMAND" }, ShardCount: news.streamMode === "PROVISIONED" ? news.shardCount : undefined, }) .pipe(Effect.catchTag("ResourceInUseException", () => Effect.void)); yield* waitForStreamActive(streamName); return { streamName, streamArn: `arn:aws:kinesis:${region}:${accountId}:stream/${streamName}`, streamStatus: "ACTIVE" as const, }; }), update: Effect.fn(function* ({ news, olds, output, session }) { // handle stream mode, shard count, retention, encryption changes // ... return output; }), delete: Effect.fn(function* ({ output }) { yield* kinesis .deleteStream({ StreamName: output.streamName, EnforceConsumerDeletion: true, }) .pipe( Effect.catchTag("ResourceNotFoundException", () => Effect.void), ); }), }; }), );Key design principles:
- Idempotent create — handle the case where the resource already exists (e.g. catch
ResourceInUseException). - Idempotent delete — if the resource is already gone, don't error (e.g. catch
ResourceNotFoundException). - Eventual consistency — wait for the resource to reach a steady state before returning.
- Tags — use
createInternalTagsanddiffTagsfromalchemy-effect/Tagsto brand resources and diff tag changes.
Every capability (e.g. S3.GetObject, Kinesis.PutRecord) is split into two layers:
Binding.Service— the runtime implementation (SDK call). Provided on the Function Effect so it is bundled into your Lambda/Worker.Binding.Policy— the deploy-time IAM policy attachment. Provided on the Stack viaAWS.providers()so it only runs duringplan/deploy, never at runtime.
This separation means your Lambda bundle only includes the code it needs, while IAM policies are resolved at deploy time.
A Binding.Service wraps an SDK client and exposes a .bind(resource) method that returns a typed function for runtime use.
import * as Kinesis from "@distilled.cloud/aws/kinesis"; import * as Binding from "alchemy-effect/Binding"; export class PutRecord extends Binding.Service< PutRecord, ( stream: Stream, ) => Effect.Effect< ( request: PutRecordRequest, ) => Effect.Effect<Kinesis.PutRecordOutput, Kinesis.PutRecordError> > >()("AWS.Kinesis.PutRecord") {} export const PutRecordLive = Layer.effect( PutRecord, Effect.gen(function* () { const Policy = yield* PutRecordPolicy; const putRecord = yield* Kinesis.putRecord; return Effect.fn(function* (stream: Stream) { const StreamName = yield* stream.streamName; yield* Policy(stream); return Effect.fn(function* (request: PutRecordRequest) { return yield* putRecord({ ...request, StreamName: yield* StreamName }); }); }); }), );Provide the *Live layer on the Function — this is what gets bundled into the Lambda:
export default Effect.gen(function* () { const stream = yield* Kinesis.Stream("Events", { streamMode: "ON_DEMAND" }); const putRecord = yield* Kinesis.PutRecord.bind(stream); // use putRecord(...) at runtime return { main: import.meta.filename } as const; }).pipe(Effect.provide(Kinesis.PutRecordLive), Lambda.Function("Producer"));A Binding.Policy runs only at deploy time to attach IAM policies (or Cloudflare bindings) to the Function's role. At runtime, Binding.Policy uses Effect.serviceOption so it gracefully becomes a no-op when the layer is not provided.
import { isFunction } from "alchemy-effect/AWS/Lambda/Function"; export class PutRecordPolicy extends Binding.Policy< PutRecordPolicy, (stream: Stream) => Effect.Effect<void> >()("AWS.Kinesis.PutRecord") {} export const PutRecordPolicyLive = PutRecordPolicy.layer.succeed( Effect.fn(function* (ctx, stream: Stream) { if (isFunction(ctx)) { yield* ctx.bind({ policyStatements: [ { Sid: "PutRecord", Effect: "Allow", Action: ["kinesis:PutRecord"], Resource: [Output.interpolate`${stream.streamArn}`], }, ], }); } else { return yield* Effect.die( `PutRecordPolicy does not support runtime '${ctx.type}'`, ); } }), );Policy layers are provided on the Stack through AWS.providers(), not on the Function:
// alchemy.run.ts — stack entrypoint export default Effect.gen(function* () { const func = yield* MyFunction; return { url: func.functionUrl }; }).pipe( Stack.make("MyStack"), // AWS.providers() includes all *PolicyLive layers (deploy-time only) Effect.provide(Layer.provide(AWS.providers(), awsConfig)), );Stack (alchemy.run.ts) ├── AWS.providers() │ ├── Resource Providers (StreamProvider, BucketProvider, ...) │ └── Binding Policies (PutRecordPolicyLive, GetObjectPolicyLive, ...) ← deploy-time only │ └── Lambda.Function("Producer") └── Effect.provide(...) ├── Kinesis.PutRecordLive ← bundled into Lambda (runtime) └── Http.lambdaHttpServer | Service | Resources |
|---|---|
| DynamoDB | Table |
| EC2 | VPC, Subnet, InternetGateway, RouteTable, SecurityGroup, ... |
| EventBridge | Rule, EventBus |
| IAM | Role, Policy |
| Kinesis | Stream |
| Lambda | Function |
| S3 | Bucket, GetObject, PutObject, Notifications |
| SQS | Queue, SendMessage, QueueSink, EventSource |
Apache-2.0