Skip to content

Conversation

@jtomaszewski
Copy link

Summary

Adds PostgreSQL LISTEN/NOTIFY support for faster event processing as an alternative to polling. When a new event is inserted into the inbox/outbox table, a database trigger sends a notification that instantly triggers event processing, reducing latency from polling interval (default 30s) to near-instant.

Changes

  • Core package:

    • Added EventListener interface with connect(), disconnect(), and events$ Observable
    • Modified RetryableInboxOutboxEventPoller to optionally accept an event listener and merge its events with polling
    • Polling continues as fallback for startup and missed notifications
  • MikroORM driver package:

    • Added PostgreSQLEventListener implementation that uses pg LISTEN/NOTIFY
    • Takes MikroORM instance and extracts connection config automatically
    • Added migration with trigger/function for NOTIFY on INSERT
    • Includes automatic reconnection on connection errors

Usage

// In your module import { PostgreSQLEventListener, EVENT_LISTENER_TOKEN } from '@nestixis/nestjs-inbox-outbox-mikroorm-driver'; @Module({ providers: [ { provide: EVENT_LISTENER_TOKEN, useFactory: (orm: MikroORM) => new PostgreSQLEventListener(orm), inject: [MikroORM], }, ], }) export class AppModule {}

Run the migration to create the trigger:

CREATE OR REPLACE FUNCTION notify_inbox_outbox_event() RETURNS TRIGGER AS $$ BEGIN PERFORM pg_notify('inbox_outbox_event', NEW.id::text); RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER inbox_outbox_event_notify AFTER INSERT ON inbox_outbox_transport_event FOR EACH ROW EXECUTE FUNCTION notify_inbox_outbox_event();

Test plan

  • Unit tests for PostgreSQLEventListener
  • Integration tests verifying instant event delivery via NOTIFY
  • All existing tests continue to pass

🤖 Generated with Claude Code

jtomaszewski and others added 11 commits December 3, 2025 01:12
* test(mikroorm-driver): add comprehensive test suite for MikroORM driver - Set up Vitest with SWC for decorator metadata support - Add test utilities for spinning up isolated NestJS apps with fresh PostgreSQL databases - Add unit tests for MikroOrmInboxOutboxTransportEvent entity - Add unit tests for MikroORMDatabaseDriver (persist, remove, flush, findAndExtendReadyToRetryEvents) - Add unit tests for MikroORMDatabaseDriverFactory (forked entity managers, isolation) - Add integration tests for TransactionalEventEmitter with real database - Fix integer overflow for timestamps by using bigint column type 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * ci: add GitHub workflow for MikroORM driver tests - Add test-mikroorm-driver.yml workflow with PostgreSQL service - Revert @mikro-orm versions to ^6.3.9 as requested 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * fix(ci): build core package before running MikroORM driver tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
Fix typo in field name throughout codebase and add database migrations to rename the column from delived_to_listeners to delivered_to_listeners. BREAKING CHANGE: Existing databases require running the new migration to rename the column. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude <noreply@anthropic.com>
* fix `npm run build` for all pkgs (vibe-kanban eeb84602) * Done. Updated the paths configuration in `test-core.yml` to match the mikroorm workflow pattern: - Added `branches: [main]` filter for both push and pull_request - Added `package.json` and `package-lock.json` (dependency changes may affect tests) - Added `.github/workflows/test-core.yml` (workflow changes should trigger a run) * Done. Removed `branches: [main]` from both workflows. They'll now trigger on any branch when the relevant paths change.
1. **test-core.yml**: Added a "Type check" step that runs `npm run build` before tests 2. **test-mikroorm-driver.yml**: Added a "Type check MikroORM driver" step that runs `npm run build` on the mikroorm-driver workspace before tests This ensures TypeScript type errors are caught early in CI before tests run, preventing the issue where tests could pass while type errors exist in the code.
* Done. Here's what I added: **Created `.github/workflows/publish-preview.yml`** - A new GitHub Action workflow that: - Triggers on pull requests and pushes to main - Builds all workspace packages - Publishes preview packages using pkg.pr.new with compact URLs **Updated `package.json` (root)** - Added a root-level `build` script for npm workspaces. **Updated `packages/core/package.json`** - Added the `repository` field (other packages already had it) to enable the `--compact` flag for shorter URLs. **Next step**: Install the [pkg.pr.new GitHub App](https://github.com/apps/pkg-pr-new) on the repository to activate the preview publishing. * fix(ci): remove --compact flag (requires npm publish with repository field)
…ntPoller (#4) * test workflows run twice in a PR? (vibe-kanban 7dc44370) see #8 * feat(core): add graceful shutdown handling to RetryableInboxOutboxEventPoller Implement OnModuleDestroy lifecycle hook to properly clean up resources during application shutdown: - Unsubscribe from RxJS interval to stop polling - Track in-flight event processing and wait for completion - Add isShuttingDown flag to prevent new processing during shutdown 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
* feat(core): add immediateProcessing option to event configuration Add optional `immediateProcessing` boolean to InboxOutboxModuleEventOptions (default: true for backward compatibility). When set to false, events are only saved to DB and processed later by the poller, enabling a safer "fire and forget" pattern for crash recovery scenarios. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * add integration test for immediateProcessing (vibe-kanban 0a39979e) verify that it is not processed immediately, but eventually once poller runs * fix: use TransactionalEventEmitterOperations enum in integration tests --------- Co-authored-by: Claude <noreply@anthropic.com>
I've implemented PostgreSQL LISTEN/NOTIFY support for faster event processing. Here's what was added: 1. **`packages/core/src/poller/event-notification-listener.interface.ts`** - New interface defining the contract for notification listeners: - `connect()` / `disconnect()` lifecycle methods - `notifications$` RxJS Observable for notification events 2. **`packages/core/src/poller/retryable-inbox-outbox-event.poller.ts`** - Modified to: - Accept optional `EventNotificationListener` via `@Optional()` decorator - Connect the listener on `onModuleInit()` - Merge notification events with polling interval using RxJS `merge()` - Properly disconnect on `onModuleDestroy()` - Keep polling as fallback (both sources trigger `processBatch()`) 3. **`packages/core/src/index.ts`** - Export the new interface and token 1. **`packages/mikroorm-driver/src/migrations/MigrationInboxOutbox1733300000.ts`** - New migration that creates: - `notify_inbox_outbox_event()` PostgreSQL function - `inbox_outbox_event_notify` trigger on `inbox_outbox_transport_event` table 2. **`packages/mikroorm-driver/src/listener/postgresql-event-notification.listener.ts`** - PostgreSQL-specific implementation: - Uses `pg` Client for dedicated LISTEN connection - Handles reconnection on connection errors - Configurable reconnect delay 3. **`packages/mikroorm-driver/src/migrations/migrations.ts`** - Updated to include new migration 4. **`packages/mikroorm-driver/src/index.ts`** - Export the new listener - `postgresql-event-notification-listener.spec.ts` - Unit tests for the listener - `listen-notify-integration.spec.ts` - Integration tests verifying instant notifications To enable LISTEN/NOTIFY in an application: ```typescript import { PostgreSQLEventNotificationListener } from '@nestixis/nestjs-inbox-outbox-mikroorm-driver'; import { EVENT_NOTIFICATION_LISTENER_TOKEN } from '@nestixis/nestjs-inbox-outbox'; // In your module providers: { provide: EVENT_NOTIFICATION_LISTENER_TOKEN, useFactory: () => new PostgreSQLEventNotificationListener({ host: 'localhost', port: 5432, user: 'postgres', password: 'password', database: 'mydb', }), } ``` The implementation is backward compatible - if no notification listener is provided, the system falls back to polling-only behavior.
- Rename EventNotificationListener → EventListener - Rename EVENT_NOTIFICATION_LISTENER_TOKEN → EVENT_LISTENER_TOKEN - Rename notifications$ → events$ - Rename PostgreSQLEventNotificationListener → PostgreSQLEventListener - PostgreSQLEventListener now takes MikroORM instance and extracts connection config automatically - Updated all tests with new naming 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

1 participant