pg_incremental
pg_incremental : Incremental Processing by Crunchy Data
Overview
| ID | Extension | Package | Version | Category | License | Language |
|---|---|---|---|---|---|---|
| 2850 | pg_incremental | pg_incremental | 1.4.1 | FEAT | PostgreSQL | C |
| Attribute | Has Binary | Has Library | Need Load | Has DDL | Relocatable | Trusted |
|---|---|---|---|---|---|---|
--s-d-- | No | Yes | No | Yes | no | no |
| Relationships | |
|---|---|
| Schemas | pg_catalog |
| Requires | pg_cron |
| See Also | age hll rum pg_graphql pg_jsonschema jsquery pg_hint_plan |
Packages
| Type | Repo | Version | PG Major Compatibility | Package Pattern | Dependencies |
|---|---|---|---|---|---|
| EXT | PIGSTY | 1.4.1 | 18 17 16 15 14 | pg_incremental | pg_cron |
| RPM | PIGSTY | 1.4.1 | 18 17 16 15 14 | pg_incremental_$v | pg_cron_$v |
| DEB | PIGSTY | 1.4.1 | 18 17 16 15 14 | postgresql-$v-pg-incremental | postgresql-$v-cron |
| Linux / PG | PG18 | PG17 | PG16 | PG15 | PG14 |
|---|---|---|---|---|---|
el8.x86_64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
el8.aarch64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
el9.x86_64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
el9.aarch64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
el10.x86_64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
el10.aarch64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
d12.x86_64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
d12.aarch64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
d13.x86_64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
d13.aarch64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
u22.x86_64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
u22.aarch64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
u24.x86_64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
u24.aarch64 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | PIGSTY 1.4.1 | MISS | MISS |
Source
pig build pkg pg_incremental;# build rpm/debInstall
Make sure PGDG and PIGSTY repo available:
pig repo add pgsql -u # add both repo and update cacheInstall this extension with pig:
pig install pg_incremental;# install via package name, for the active PG version pig install pg_incremental -v 18; # install for PG 18 pig install pg_incremental -v 17; # install for PG 17 pig install pg_incremental -v 16; # install for PG 16Create this extension with:
CREATE EXTENSION pg_incremental CASCADE; -- requires pg_cronUsage
The pg_incremental extension provides fast, reliable incremental batch processing pipelines in PostgreSQL. It defines parameterized queries that execute periodically for new data, ensuring exactly-once delivery.
CREATE EXTENSION pg_incremental CASCADE; -- depends on pg_cronPipeline Types
There are three types of pipelines:
- Sequence pipelines – Process ranges of sequence values from a table
- Time interval pipelines – Process time ranges after intervals pass
- File list pipelines – Process new files from a file listing function
Sequence Pipeline
Create a pipeline that incrementally aggregates new rows using a sequence:
SELECT incremental.create_sequence_pipeline('event-aggregation', 'events', $$ INSERT INTO events_agg SELECT date_trunc('day', event_time), count(*) FROM events WHERE event_id BETWEEN $1 AND $2 GROUP BY 1 ON CONFLICT (day) DO UPDATE SET event_count = events_agg.event_count + excluded.event_count $$);$1 and $2 are set to the lowest and highest sequence values that can be safely processed.
With batch size limiting:
SELECT incremental.create_sequence_pipeline( 'event-aggregation', 'events', $$ ... $$, schedule := '* * * * *', max_batch_size := 10000 );Time Interval Pipeline
Process data in fixed time intervals:
SELECT incremental.create_time_interval_pipeline('event-aggregation', '1 day', $$ INSERT INTO events_agg SELECT event_time::date, count(distinct event_id) FROM events WHERE event_time >= $1 AND event_time < $2 GROUP BY 1 $$);$1 and $2 are set to the start and end (exclusive) of the time range.
For per-interval execution (e.g., daily exports):
SELECT incremental.create_time_interval_pipeline('event-export', time_interval := '1 day', batched := false, start_time := '2024-11-01', command := $$ SELECT export_events($1, $2) $$ );File List Pipeline
Process new files as they appear:
SELECT incremental.create_file_list_pipeline('event-import', 's3://mybucket/events/*.csv', $$ SELECT import_events($1) $$);Management Functions
| Function | Description |
|---|---|
incremental.execute_pipeline(name) | Manually execute a pipeline (only if new data exists) |
incremental.reset_pipeline(name) | Reset pipeline to reprocess from the beginning |
incremental.drop_pipeline(name) | Remove a pipeline |
incremental.skip_file(pipeline, path) | Skip a faulty file in a file list pipeline |
Monitoring
SELECT * FROM incremental.sequence_pipelines; SELECT * FROM incremental.time_interval_pipelines; SELECT * FROM incremental.processed_files;Check job outcomes via pg_cron:
SELECT jobname, start_time, status, return_message FROM cron.job_run_details JOIN cron.job USING (jobid) WHERE jobname LIKE 'pipeline:%' ORDER BY 1 DESC LIMIT 5;