Skip to content

Commit eba8dc6

Browse files
committed
Problem: inability to fine-tune transaction boundaries easily
Solution: implement an interface for sub-transactions It's a fairly simplistic interface at this point and it allows choosing a mode of release on drop (commit by default). This is an improved version of the original implementation in pgx-contrib-spiext (https://github.com/supabase/pgx-contrib-spiext/blob/main/src/subtxn.rs)
1 parent e460d80 commit eba8dc6

File tree

5 files changed

+304
-0
lines changed

5 files changed

+304
-0
lines changed

pgx-tests/src/tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ mod shmem_tests;
4040
mod spi_tests;
4141
mod srf_tests;
4242
mod struct_type_tests;
43+
mod subxact_tests;
4344
mod trigger_tests;
4445
mod uuid_tests;
4546
mod variadic_tests;
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
Portions Copyright 2019-2021 ZomboDB, LLC.
3+
Portions Copyright 2021-2022 Technology Concepts & Design, Inc. <[email protected]>
4+
5+
All rights reserved.
6+
7+
Use of this source code is governed by the MIT license that can be found in the LICENSE file.
8+
*/
9+
10+
#[cfg(any(test, feature = "pg_test"))]
11+
#[pgx::pg_schema]
12+
mod tests {
13+
#[allow(unused_imports)]
14+
use crate as pgx_tests;
15+
16+
use pgx::prelude::*;
17+
use pgx::SpiClient;
18+
19+
#[pg_test]
20+
fn test_subxact_smoketest() {
21+
Spi::execute(|c| {
22+
c.update("CREATE TABLE a (v INTEGER)", None, None);
23+
let c = c.sub_transaction(|xact| {
24+
xact.update("INSERT INTO a VALUES (0)", None, None);
25+
assert_eq!(
26+
0,
27+
xact.select("SELECT v FROM a", Some(1), None)
28+
.first()
29+
.get_datum::<i32>(1)
30+
.unwrap()
31+
);
32+
let xact = xact.sub_transaction(|xact| {
33+
xact.update("INSERT INTO a VALUES (1)", None, None);
34+
assert_eq!(
35+
2,
36+
xact.select("SELECT COUNT(*) FROM a", Some(1), None)
37+
.first()
38+
.get_datum::<i32>(1)
39+
.unwrap()
40+
);
41+
xact.rollback()
42+
});
43+
xact.rollback()
44+
});
45+
assert_eq!(
46+
0,
47+
c.select("SELECT COUNT(*) FROM a", Some(1), None)
48+
.first()
49+
.get_datum::<i32>(1)
50+
.unwrap()
51+
);
52+
})
53+
}
54+
55+
#[pg_test]
56+
fn test_commit_on_drop() {
57+
Spi::execute(|c| {
58+
c.update("CREATE TABLE a (v INTEGER)", None, None);
59+
// The type below is explicit to ensure it's commit on drop by default
60+
c.sub_transaction(|xact: SubTransaction<SpiClient, true>| {
61+
xact.update("INSERT INTO a VALUES (0)", None, None);
62+
// Dropped explicitly for illustration purposes
63+
drop(xact);
64+
});
65+
// Create a new client to check the state
66+
Spi::execute(|c| {
67+
// The above insert should have been committed
68+
assert_eq!(
69+
1,
70+
c.select("SELECT COUNT(*) FROM a", Some(1), None)
71+
.first()
72+
.get_datum::<i32>(1)
73+
.unwrap()
74+
);
75+
});
76+
})
77+
}
78+
79+
#[pg_test]
80+
fn test_rollback_on_drop() {
81+
Spi::execute(|c| {
82+
c.update("CREATE TABLE a (v INTEGER)", None, None);
83+
// The type below is explicit to ensure it's commit on drop by default
84+
c.sub_transaction(|xact: SubTransaction<SpiClient, true>| {
85+
xact.update("INSERT INTO a VALUES (0)", None, None);
86+
let xact = xact.rollback_on_drop();
87+
// Dropped explicitly for illustration purposes
88+
drop(xact);
89+
});
90+
// Create a new client to check the state
91+
Spi::execute(|c| {
92+
// The above insert should NOT have been committed
93+
assert_eq!(
94+
0,
95+
c.select("SELECT COUNT(*) FROM a", Some(1), None)
96+
.first()
97+
.get_datum::<i32>(1)
98+
.unwrap()
99+
);
100+
});
101+
})
102+
}
103+
}

pgx/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ pub mod shmem;
6363
pub mod spi;
6464
pub mod spinlock;
6565
pub mod stringinfo;
66+
pub mod subxact;
6667
pub mod trigger_support;
6768
pub mod tupdesc;
6869
pub mod varlena;

pgx/src/prelude.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,6 @@ pub use crate::pg_sys::{
3636
check_for_interrupts, debug1, debug2, debug3, debug4, debug5, ereport, error, function_name,
3737
info, log, notice, warning, FATAL, PANIC,
3838
};
39+
40+
// Sub-transactions
41+
pub use crate::subxact::*;

pgx/src/subxact.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
use crate::{pg_sys, PgMemoryContexts, SpiClient};
2+
use std::fmt::{Debug, Formatter};
3+
use std::ops::Deref;
4+
5+
/// Sub-transaction
6+
///
7+
/// Can be created by calling `SpiClient::sub_transaction`, `SubTransaction<Parent>::sub_transaction`
8+
/// or any other implementation of `SubTransactionExt` and obtaining it as an argument to the provided closure.
9+
///
10+
/// Unless rolled back or committed explicitly, it'll commit if `COMMIT` generic parameter is `true`
11+
/// (default) or roll back if it is `false`.
12+
pub struct SubTransaction<Parent: SubTransactionExt, const COMMIT: bool = true> {
13+
memory_context: pg_sys::MemoryContext,
14+
resource_owner: pg_sys::ResourceOwner,
15+
// Should the transaction be released, or was it already committed or rolled back?
16+
//
17+
// The reason we are not calling this `released` as we're also using this flag when
18+
// we convert between commit_on_drop and rollback_on_drop to ensure it doesn't get released
19+
// on the drop of the original value.
20+
should_release: bool,
21+
parent: Option<Parent>,
22+
}
23+
24+
impl<Parent: SubTransactionExt, const COMMIT: bool> Debug for SubTransaction<Parent, COMMIT> {
25+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
26+
f.write_str(std::any::type_name::<Self>())
27+
}
28+
}
29+
30+
impl<Parent: SubTransactionExt, const COMMIT: bool> SubTransaction<Parent, COMMIT> {
31+
/// Create a new sub-transaction.
32+
fn new(parent: Parent) -> Self {
33+
// Remember the memory context before starting the sub-transaction
34+
let ctx = PgMemoryContexts::CurrentMemoryContext.value();
35+
// Remember resource owner before starting the sub-transaction
36+
let resource_owner = unsafe { pg_sys::CurrentResourceOwner };
37+
unsafe {
38+
pg_sys::BeginInternalSubTransaction(std::ptr::null());
39+
}
40+
// Switch to the outer memory context so that all allocations remain
41+
// there instead of the sub-transaction's context
42+
PgMemoryContexts::For(ctx).set_as_current();
43+
Self { memory_context: ctx, should_release: true, resource_owner, parent: Some(parent) }
44+
}
45+
46+
/// Commit the transaction, returning its parent
47+
pub fn commit(mut self) -> Parent {
48+
self.internal_commit();
49+
self.should_release = false;
50+
self.parent.take().unwrap()
51+
}
52+
53+
/// Rollback the transaction, returning its parent
54+
pub fn rollback(mut self) -> Parent {
55+
self.internal_rollback();
56+
self.should_release = false;
57+
self.parent.take().unwrap()
58+
}
59+
60+
/// Returns the memory context this transaction is in
61+
pub fn memory_context(&self) -> PgMemoryContexts {
62+
PgMemoryContexts::For(self.memory_context)
63+
}
64+
65+
fn internal_rollback(&self) {
66+
unsafe {
67+
pg_sys::RollbackAndReleaseCurrentSubTransaction();
68+
pg_sys::CurrentResourceOwner = self.resource_owner;
69+
}
70+
PgMemoryContexts::For(self.memory_context).set_as_current();
71+
}
72+
73+
fn internal_commit(&self) {
74+
unsafe {
75+
pg_sys::ReleaseCurrentSubTransaction();
76+
pg_sys::CurrentResourceOwner = self.resource_owner;
77+
}
78+
PgMemoryContexts::For(self.memory_context).set_as_current();
79+
}
80+
}
81+
82+
impl<Parent: SubTransactionExt> SubTransaction<Parent, true> {
83+
/// Make this sub-transaction roll back on drop
84+
pub fn rollback_on_drop(self) -> SubTransaction<Parent, false> {
85+
self.into()
86+
}
87+
}
88+
89+
impl<Parent: SubTransactionExt> SubTransaction<Parent, false> {
90+
/// Make this sub-transaction commit on drop
91+
pub fn commit_on_drop(self) -> SubTransaction<Parent, true> {
92+
self.into()
93+
}
94+
}
95+
96+
impl<Parent: SubTransactionExt> Into<SubTransaction<Parent, false>>
97+
for SubTransaction<Parent, true>
98+
{
99+
fn into(mut self) -> SubTransaction<Parent, false> {
100+
let result = SubTransaction {
101+
memory_context: self.memory_context,
102+
resource_owner: self.resource_owner,
103+
should_release: self.should_release,
104+
parent: self.parent.take(),
105+
};
106+
// Make sure original sub-transaction won't commit
107+
self.should_release = false;
108+
result
109+
}
110+
}
111+
112+
impl<Parent: SubTransactionExt> Into<SubTransaction<Parent, true>>
113+
for SubTransaction<Parent, false>
114+
{
115+
fn into(mut self) -> SubTransaction<Parent, true> {
116+
let result = SubTransaction {
117+
memory_context: self.memory_context,
118+
resource_owner: self.resource_owner,
119+
should_release: self.should_release,
120+
parent: self.parent.take(),
121+
};
122+
// Make sure original sub-transaction won't roll back
123+
self.should_release = false;
124+
result
125+
}
126+
}
127+
128+
impl<Parent: SubTransactionExt, const COMMIT: bool> Drop for SubTransaction<Parent, COMMIT> {
129+
fn drop(&mut self) {
130+
if self.should_release {
131+
if COMMIT {
132+
self.internal_commit();
133+
} else {
134+
self.internal_rollback();
135+
}
136+
}
137+
}
138+
}
139+
140+
// This allows SubTransaction to be de-referenced to SpiClient
141+
impl<'conn, const COMMIT: bool> Deref for SubTransaction<SpiClient<'conn>, COMMIT> {
142+
type Target = SpiClient<'conn>;
143+
144+
fn deref(&self) -> &Self::Target {
145+
self.parent.as_ref().unwrap()
146+
}
147+
}
148+
149+
// This allows a SubTransaction of a SubTransaction to be de-referenced to SpiClient
150+
impl<Parent: SubTransactionExt, const COMMIT: bool> Deref
151+
for SubTransaction<SubTransaction<Parent>, COMMIT>
152+
{
153+
type Target = Parent;
154+
155+
fn deref(&self) -> &Self::Target {
156+
self.parent.as_ref().and_then(|p| p.parent.as_ref()).unwrap()
157+
}
158+
}
159+
160+
/// Trait that allows creating a sub-transaction off any type
161+
pub trait SubTransactionExt {
162+
/// Parent's type
163+
///
164+
/// In most common cases, it'll be equal to `Self`. However, in some cases
165+
/// it may be desirable to use a different type to achieve certain goals.
166+
type T: SubTransactionExt;
167+
168+
/// Consume `self` and execute a closure with a sub-transaction
169+
///
170+
/// If further use of the given sub-transaction is necessary, it must
171+
/// be returned by the closure alongside with its intended result. Otherwise,
172+
/// the sub-transaction be released when dropped.
173+
fn sub_transaction<F: FnOnce(SubTransaction<Self::T>) -> R, R>(self, f: F) -> R
174+
where
175+
Self: Sized;
176+
}
177+
178+
impl<'a> SubTransactionExt for SpiClient<'a> {
179+
type T = SpiClient<'a>;
180+
fn sub_transaction<F: FnOnce(SubTransaction<Self::T>) -> R, R>(self, f: F) -> R
181+
where
182+
Self: Sized,
183+
{
184+
f(SubTransaction::new(self))
185+
}
186+
}
187+
188+
impl<Parent: SubTransactionExt> SubTransactionExt for SubTransaction<Parent> {
189+
type T = SubTransaction<Parent>;
190+
fn sub_transaction<F: FnOnce(SubTransaction<Self::T>) -> R, R>(self, f: F) -> R
191+
where
192+
Self: Sized,
193+
{
194+
f(SubTransaction::new(self))
195+
}
196+
}

0 commit comments

Comments
 (0)