@@ -4,6 +4,137 @@ declare
44 r record;
55begin
66 set local search_path = ' ' ;
7+
8+ /*
9+ Override the pgmq.drop_queue to check if relevant tables are owned
10+ by the pgmq extension before attempting to run
11+ `alter extension pgmq drop table ...`
12+ this is necessary becasue, to enable nightly logical backups to include user queues
13+ we automatically detach them from pgmq.
14+
15+ this update is backwards compatible with version 1.4.4 but should be removed once we're on
16+ physical backups everywhere
17+ */
18+ -- Detach and delete the official function
19+ alter extension pgmq drop function pgmq .drop_queue ;
20+ drop function pgmq .drop_queue ;
21+
22+ -- Create and reattach the patched function
23+ CREATE FUNCTION pgmq .drop_queue(queue_name TEXT )
24+ RETURNS BOOLEAN AS $func$
25+ DECLARE
26+ qtable TEXT := pgmq .format_table_name (queue_name, ' q' );
27+ qtable_seq TEXT := qtable || ' _msg_id_seq' ;
28+ fq_qtable TEXT := ' pgmq.' || qtable;
29+ atable TEXT := pgmq .format_table_name (queue_name, ' a' );
30+ fq_atable TEXT := ' pgmq.' || atable;
31+ partitioned BOOLEAN ;
32+ BEGIN
33+ EXECUTE FORMAT(
34+ $QUERY$
35+ SELECT is_partitioned FROM pgmq .meta WHERE queue_name = %L
36+ $QUERY$,
37+ queue_name
38+ ) INTO partitioned;
39+
40+ -- NEW CONDITIONAL CHECK
41+ if exists (
42+ select 1
43+ from pg_class c
44+ join pg_depend d on c .oid = d .objid
45+ join pg_extension e on d .refobjid = e .oid
46+ where c .relname = qtable and e .extname = ' pgmq'
47+ ) then
48+
49+ EXECUTE FORMAT(
50+ $QUERY$
51+ ALTER EXTENSION pgmq DROP TABLE pgmq .%I
52+ $QUERY$,
53+ qtable
54+ );
55+
56+ end if;
57+
58+ -- NEW CONDITIONAL CHECK
59+ if exists (
60+ select 1
61+ from pg_class c
62+ join pg_depend d on c .oid = d .objid
63+ join pg_extension e on d .refobjid = e .oid
64+ where c .relname = qtable_seq and e .extname = ' pgmq'
65+ ) then
66+ EXECUTE FORMAT(
67+ $QUERY$
68+ ALTER EXTENSION pgmq DROP SEQUENCE pgmq.%I
69+ $QUERY$,
70+ qtable_seq
71+ );
72+
73+ end if;
74+
75+ -- NEW CONDITIONAL CHECK
76+ if exists (
77+ select 1
78+ from pg_class c
79+ join pg_depend d on c .oid = d .objid
80+ join pg_extension e on d .refobjid = e .oid
81+ where c .relname = atable and e .extname = ' pgmq'
82+ ) then
83+
84+ EXECUTE FORMAT(
85+ $QUERY$
86+ ALTER EXTENSION pgmq DROP TABLE pgmq .%I
87+ $QUERY$,
88+ atable
89+ );
90+
91+ end if;
92+
93+ -- NO CHANGES PAST THIS POINT
94+
95+ EXECUTE FORMAT(
96+ $QUERY$
97+ DROP TABLE IF EXISTS pgmq.%I
98+ $QUERY$,
99+ qtable
100+ );
101+
102+ EXECUTE FORMAT(
103+ $QUERY$
104+ DROP TABLE IF EXISTS pgmq.%I
105+ $QUERY$,
106+ atable
107+ );
108+
109+ IF EXISTS (
110+ SELECT 1
111+ FROM information_schema .tables
112+ WHERE table_name = ' meta' and table_schema = ' pgmq'
113+ ) THEN
114+ EXECUTE FORMAT(
115+ $QUERY$
116+ DELETE FROM pgmq .meta WHERE queue_name = %L
117+ $QUERY$,
118+ queue_name
119+ );
120+ END IF;
121+
122+ IF partitioned THEN
123+ EXECUTE FORMAT(
124+ $QUERY$
125+ DELETE FROM %I .part_config where parent_table in (%L, %L)
126+ $QUERY$,
127+ pgmq ._get_pg_partman_schema (), fq_qtable, fq_atable
128+ );
129+ END IF;
130+
131+ RETURN TRUE;
132+ END;
133+ $func$ LANGUAGE plpgsql;
134+
135+ alter extension pgmq add function pgmq .drop_queue ;
136+
137+
7138 update pg_extension set extowner = ' postgres' ::regrole where extname = ' pgmq' ;
8139 for r in (select * from pg_depend where refobjid = extoid) loop
9140 if r .classid = ' pg_type' ::regclass then
0 commit comments