Skip to content

Commit f2cd0ae

Browse files
committed
Provide DB backed implementation of RegistryStateService
This mirrors the previous PR which provided a file backed implementation. This will be wired in a future PR. Note that I changed the DB schema to match the existing sync status structure to simplify implementation. I also added a uniqueness constraint since we now assume that there is only one row in registry_sync per registry.
1 parent 201113d commit f2cd0ae

File tree

11 files changed

+1526
-17
lines changed

11 files changed

+1526
-17
lines changed

database/migrate_test.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,26 @@ func TestMigrations(t *testing.T) {
2727
fnames, err := fs.Glob(migrationsFS, "migrations/*.up.sql")
2828
require.NoError(t, err)
2929

30+
// Test each migration individually: apply, rollback, re-apply
3031
for i := 1; i <= len(fnames); i++ {
31-
// step up
32-
err = m.Steps(i)
33-
assert.NoError(t, err)
32+
// Apply one migration
33+
err = m.Steps(1)
34+
assert.NoError(t, err, "Failed to apply migration %d", i)
3435

35-
// step down
36-
err = m.Steps(-i)
37-
assert.NoError(t, err)
36+
// Roll back one migration
37+
err = m.Steps(-1)
38+
assert.NoError(t, err, "Failed to roll back migration %d", i)
3839

39-
// step up again
40-
err = m.Steps(i)
41-
assert.NoError(t, err)
40+
// Re-apply the same migration
41+
err = m.Steps(1)
42+
assert.NoError(t, err, "Failed to re-apply migration %d", i)
4243
}
44+
45+
// Test rolling back all migrations
46+
err = m.Down()
47+
assert.NoError(t, err, "Failed to roll back all migrations")
48+
49+
// Test applying all migrations at once
50+
err = m.Up()
51+
assert.NoError(t, err, "Failed to apply all migrations")
4352
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
-- Remove additional status fields and unique constraint from registry_sync table
2+
3+
-- Remove the additional columns (in reverse order from up migration)
4+
ALTER TABLE registry_sync DROP COLUMN server_count;
5+
ALTER TABLE registry_sync DROP COLUMN last_applied_filter_hash;
6+
ALTER TABLE registry_sync DROP COLUMN last_sync_hash;
7+
ALTER TABLE registry_sync DROP COLUMN attempt_count;
8+
9+
-- Remove the unique constraint
10+
ALTER TABLE registry_sync DROP CONSTRAINT registry_sync_reg_id_key;
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
-- Add unique constraint and additional status fields to registry_sync table
2+
-- This ensures that each registry can only have one sync status record
3+
-- and matches the fields in the SyncStatus struct
4+
5+
-- Add unique constraint on reg_id
6+
ALTER TABLE registry_sync ADD CONSTRAINT registry_sync_reg_id_key UNIQUE (reg_id);
7+
8+
-- Add missing fields from SyncStatus struct
9+
-- Use BIGINT for count fields to support 64-bit integers
10+
ALTER TABLE registry_sync ADD COLUMN attempt_count BIGINT NOT NULL DEFAULT 0;
11+
ALTER TABLE registry_sync ADD COLUMN last_sync_hash TEXT;
12+
ALTER TABLE registry_sync ADD COLUMN last_applied_filter_hash TEXT;
13+
ALTER TABLE registry_sync ADD COLUMN server_count BIGINT NOT NULL DEFAULT 0;

database/queries/registry.sql

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,46 @@ INSERT INTO registry (
4646
sqlc.arg(created_at),
4747
sqlc.arg(updated_at)
4848
) RETURNING id;
49+
50+
-- name: UpsertRegistry :one
51+
INSERT INTO registry (
52+
name,
53+
reg_type,
54+
created_at,
55+
updated_at
56+
) VALUES (
57+
sqlc.arg(name),
58+
sqlc.arg(reg_type),
59+
sqlc.arg(created_at),
60+
sqlc.arg(updated_at)
61+
)
62+
ON CONFLICT (name) DO UPDATE SET
63+
reg_type = EXCLUDED.reg_type,
64+
updated_at = EXCLUDED.updated_at
65+
RETURNING id;
66+
67+
-- name: BulkUpsertRegistries :many
68+
INSERT INTO registry (
69+
name,
70+
reg_type,
71+
created_at,
72+
updated_at
73+
)
74+
SELECT
75+
unnest(sqlc.arg(names)::text[]),
76+
unnest(sqlc.arg(reg_types)::registry_type[]),
77+
unnest(sqlc.arg(created_ats)::timestamp with time zone[]),
78+
unnest(sqlc.arg(updated_ats)::timestamp with time zone[])
79+
ON CONFLICT (name) DO UPDATE SET
80+
reg_type = EXCLUDED.reg_type,
81+
updated_at = EXCLUDED.updated_at
82+
RETURNING id, name;
83+
84+
-- name: DeleteRegistriesNotInList :exec
85+
DELETE FROM registry WHERE id NOT IN (SELECT unnest(sqlc.arg(ids)::uuid[]));
86+
87+
-- name: DeleteRegistry :exec
88+
DELETE FROM registry WHERE name = sqlc.arg(name);
89+
90+
-- name: ListAllRegistryNames :many
91+
SELECT name FROM registry ORDER BY name;

database/queries/sync.sql

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ SELECT id,
44
sync_status,
55
error_msg,
66
started_at,
7-
ended_at
7+
ended_at,
8+
attempt_count,
9+
last_sync_hash,
10+
last_applied_filter_hash,
11+
server_count
812
FROM registry_sync
913
WHERE id = sqlc.arg(id);
1014

@@ -27,3 +31,90 @@ UPDATE registry_sync SET
2731
error_msg = sqlc.narg(error_msg),
2832
ended_at = sqlc.arg(ended_at)
2933
WHERE id = sqlc.arg(id);
34+
35+
-- name: GetRegistrySyncByName :one
36+
SELECT rs.id,
37+
rs.reg_id,
38+
rs.sync_status,
39+
rs.error_msg,
40+
rs.started_at,
41+
rs.ended_at,
42+
rs.attempt_count,
43+
rs.last_sync_hash,
44+
rs.last_applied_filter_hash,
45+
rs.server_count
46+
FROM registry_sync rs
47+
INNER JOIN registry r ON rs.reg_id = r.id
48+
WHERE r.name = sqlc.arg(name);
49+
50+
-- name: ListRegistrySyncs :many
51+
SELECT r.name,
52+
rs.id,
53+
rs.reg_id,
54+
rs.sync_status,
55+
rs.error_msg,
56+
rs.started_at,
57+
rs.ended_at,
58+
rs.attempt_count,
59+
rs.last_sync_hash,
60+
rs.last_applied_filter_hash,
61+
rs.server_count
62+
FROM registry_sync rs
63+
INNER JOIN registry r ON rs.reg_id = r.id
64+
ORDER BY r.name;
65+
66+
-- name: UpsertRegistrySyncByName :exec
67+
INSERT INTO registry_sync (
68+
reg_id,
69+
sync_status,
70+
error_msg,
71+
started_at,
72+
ended_at,
73+
attempt_count,
74+
last_sync_hash,
75+
last_applied_filter_hash,
76+
server_count
77+
) VALUES (
78+
(SELECT id FROM registry WHERE name = sqlc.arg(name)),
79+
sqlc.arg(sync_status),
80+
sqlc.narg(error_msg),
81+
sqlc.arg(started_at),
82+
sqlc.arg(ended_at),
83+
sqlc.arg(attempt_count),
84+
sqlc.narg(last_sync_hash),
85+
sqlc.narg(last_applied_filter_hash),
86+
sqlc.arg(server_count)
87+
)
88+
ON CONFLICT (reg_id) DO UPDATE SET
89+
sync_status = EXCLUDED.sync_status,
90+
error_msg = EXCLUDED.error_msg,
91+
started_at = EXCLUDED.started_at,
92+
ended_at = EXCLUDED.ended_at,
93+
attempt_count = EXCLUDED.attempt_count,
94+
last_sync_hash = EXCLUDED.last_sync_hash,
95+
last_applied_filter_hash = EXCLUDED.last_applied_filter_hash,
96+
server_count = EXCLUDED.server_count;
97+
98+
-- name: InitializeRegistrySync :exec
99+
INSERT INTO registry_sync (
100+
reg_id,
101+
sync_status,
102+
error_msg
103+
) VALUES (
104+
(SELECT id FROM registry WHERE name = sqlc.arg(name)),
105+
sqlc.arg(sync_status),
106+
sqlc.arg(error_msg)
107+
)
108+
ON CONFLICT (reg_id) DO NOTHING;
109+
110+
-- name: BulkInitializeRegistrySyncs :exec
111+
INSERT INTO registry_sync (
112+
reg_id,
113+
sync_status,
114+
error_msg
115+
)
116+
SELECT
117+
unnest(sqlc.arg(reg_ids)::uuid[]),
118+
unnest(sqlc.arg(sync_statuses)::sync_status[]),
119+
unnest(sqlc.arg(error_msgs)::text[])
120+
ON CONFLICT (reg_id) DO NOTHING;

internal/db/sqlc/models.go

Lines changed: 10 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/db/sqlc/querier.go

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)