Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ do $$

drop table if exists moves_view;

create temp table moves_view as
create table moves_view as
select transactions_seq, public.aggregate_objects(jsonb_build_object(accounts_address, volumes)) as volumes
from (
select transactions_seq::numeric, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes
select transactions_seq, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes
from (
SELECT DISTINCT ON (moves.transactions_seq, accounts_address, asset) moves.transactions_seq, accounts_address, asset,
first_value(post_commit_volumes) OVER (
Expand All @@ -27,8 +27,11 @@ do $$
group by transactions_seq;

create index moves_view_idx on moves_view(transactions_seq);
-- speed up hash join when updating rows later
alter table moves_view add foreign key(transactions_seq) references transactions(seq);

if (select count(*) from moves_view) = 0 then
drop table moves_view;
return;
end if;

Expand All @@ -46,7 +49,10 @@ do $$
from data
where transactions.seq = data.transactions_seq;

exit when not found;
if not found then
drop table moves_view;
exit;
end if;

_offset = _offset + _batch_size;

Expand Down
12 changes: 9 additions & 3 deletions internal/storage/bucket/migrations/27-fix-invalid-pcv/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ do $$

drop table if exists moves_view;

create temp table moves_view as
create table moves_view as
select transactions_seq, public.aggregate_objects(jsonb_build_object(accounts_address, volumes)) as volumes
from (
select transactions_seq::numeric, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes
select transactions_seq, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes
from (
SELECT DISTINCT ON (moves.transactions_seq, accounts_address, asset) moves.transactions_seq, accounts_address, asset,
first_value(post_commit_volumes) OVER (
Expand All @@ -27,8 +27,11 @@ do $$
group by transactions_seq;

create index moves_view_idx on moves_view(transactions_seq);
-- speed up hash join when updating rows later
alter table moves_view add foreign key(transactions_seq) references transactions(seq);

if (select count(*) from moves_view) = 0 then
drop table moves_view;
return;
end if;

Expand All @@ -46,7 +49,10 @@ do $$
from data
where transactions.seq = data.transactions_seq;

exit when not found;
if not found then
drop table moves_view;
exit;
end if;

_offset = _offset + _batch_size;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ do $$

drop table if exists moves_view;

create temp table moves_view as
create table moves_view as
select transactions_seq, public.aggregate_objects(jsonb_build_object(accounts_address, volumes)) as volumes
from (
select transactions_seq::numeric, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes
select transactions_seq, accounts_address, public.aggregate_objects(json_build_object(asset, json_build_object('input', (post_commit_volumes).inputs, 'output', (post_commit_volumes).outputs))::jsonb) as volumes
from (
SELECT DISTINCT ON (moves.transactions_seq, accounts_address, asset) moves.transactions_seq, accounts_address, asset,
first_value(post_commit_volumes) OVER (
Expand All @@ -27,8 +27,11 @@ do $$
group by transactions_seq;

create index moves_view_idx on moves_view(transactions_seq);
-- speed up hash join when updating rows later
alter table moves_view add foreign key(transactions_seq) references transactions(seq);

if (select count(*) from moves_view) = 0 then
drop table moves_view;
return;
end if;

Expand All @@ -46,7 +49,10 @@ do $$
from data
where transactions.seq = data.transactions_seq;

exit when not found;
if not found then
drop table moves_view;
exit;
end if;

_offset = _offset + _batch_size;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ do $$

drop table if exists txs_view;

create temp table txs_view as
create table txs_view as
select *
from transactions
where updated_at is null;

if (select count(*) from txs_view) = 0 then
drop table txs_view;
return;
end if;
-- speed up hash join when updating rows later
create index txs_view_seq_idx on txs_view(seq);
alter table txs_view add foreign key(seq) references transactions(seq);

perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from txs_view));

Expand All @@ -29,10 +33,12 @@ do $$
update transactions
set updated_at = transactions.inserted_at
from data
where transactions.seq = data.seq and
transactions.ledger = data.ledger;
where transactions.seq = data.seq;

exit when not found;
if not found then
drop table txs_view;
exit;
end if;

_offset = _offset + _batch_size;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
do $$
declare
_offset integer := 0;
_batch_size integer := 1000;
_batch_size integer := 10000;
begin
set search_path = '{{ .Schema }}';

Expand All @@ -15,9 +15,8 @@ do $$
with data as (
select *
from logs
where seq >= _offset and seq < _offset + _batch_size
order by seq
offset _offset
limit _batch_size
)
Comment on lines 15 to 20
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Range-based pagination risks skipping rows with sparse seq values

Switching from OFFSET/LIMIT to

where seq >= _offset and seq < _offset + _batch_size

assumes that no gap in seq exceeds _batch_size.
If a gap larger than 10 000 exists (e.g., deleted rows or a sequence reset), the data CTE will return zero rows even though unprocessed rows still exist above the gap. Because exit when not found; triggers on the empty UPDATE, the loop terminates prematurely and leaves those rows un-migrated.

A safer pattern is to drive the loop by the last processed seq, not by a synthetic offset:

-        _offset integer := 0;
+        _last_seq integer := -1;

...
-            with data as (
-                select *
-                from logs
-                where seq >= _offset and seq < _offset + _batch_size
-                order by seq
-            )
+            with data as (
+                select *
+                from logs
+                where seq > _last_seq
+                order by seq
+                limit _batch_size
+            )
...
-            exit when not found;
-
-            _offset = _offset + _batch_size;
+            exit when not found;
+
+            -- advance to the highest seq we just touched
+            _last_seq = (select max(seq) from data);

This iterates until all rows are handled, independent of sparsity.

🤖 Prompt for AI Agents
In internal/storage/bucket/migrations/34-fix-memento-format/up.sql around lines
15 to 20, the current pagination uses a range condition on seq that assumes no
gaps larger than _batch_size, which can cause skipping rows if large gaps exist.
To fix this, refactor the loop to track the last processed seq value and use it
to fetch the next batch with a condition like seq > last_processed_seq ORDER BY
seq LIMIT _batch_size, ensuring all rows are processed regardless of gaps.

update logs
set memento = convert_to(
Expand Down Expand Up @@ -82,15 +81,15 @@ do $$
from data
where logs.seq = data.seq;

exit when not found;
if _offset >= (select max(seq) from logs) then
exit;
end if;

_offset = _offset + _batch_size;

perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);

commit;
end loop;

drop table if exists txs_view;
end
$$;
Loading