Skip to content

Commit ee1a1d8

Browse files
authored
Add setup command & schema admin to users config (#387)
1 parent b143dcb commit ee1a1d8

File tree

11 files changed

+165
-13
lines changed

11 files changed

+165
-13
lines changed

integration/users.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ name = "pgdog"
88
database = "pgdog_sharded"
99
password = "pgdog"
1010

11+
[[users]]
12+
name = "pgdog_migrator"
13+
database = "pgdog_sharded"
14+
password = "pgdog"
15+
server_user = "pgdog"
16+
schema_admin = true
17+
1118
[[users]]
1219
name = "pgdog"
1320
database = "failover"

pgdog/src/admin/show_pools.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ impl Command for ShowPools {
4141
Field::numeric("out_of_sync"),
4242
Field::bool("online"),
4343
Field::text("replica_lag"),
44+
Field::bool("schema_admin"),
4445
]);
4546
let mut messages = vec![rd.message()?];
4647
for (user, cluster) in databases().all() {
@@ -71,7 +72,8 @@ impl Command for ShowPools {
7172
.add(state.re_synced)
7273
.add(state.out_of_sync)
7374
.add(state.online)
74-
.add(state.replica_lag.simple_display());
75+
.add(state.replica_lag.simple_display())
76+
.add(cluster.schema_admin());
7577

7678
messages.push(row.message()?);
7779
}

pgdog/src/backend/databases.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,17 @@ impl Databases {
215215
}
216216
}
217217

218+
/// Get the schema owner for this database.
219+
pub fn schema_owner(&self, database: &str) -> Result<Cluster, Error> {
220+
for (user, cluster) in &self.databases {
221+
if cluster.schema_admin() && user.database == database {
222+
return Ok(cluster.clone());
223+
}
224+
}
225+
226+
Err(Error::NoSchemaOwner(database.to_owned()))
227+
}
228+
218229
pub fn mirrors(&self, user: impl ToUser) -> Result<Option<&[Cluster]>, Error> {
219230
let user = user.to_user();
220231
if let Some(cluster) = self.databases.get(&user) {
@@ -443,6 +454,23 @@ pub fn from_config(config: &ConfigAndUsers) -> Databases {
443454
}
444455
}
445456

457+
// Duplicate schema owner check.
458+
let mut dupl_schema_owners = HashMap::<String, usize>::new();
459+
for (user, cluster) in &mut databases {
460+
if cluster.schema_admin() {
461+
let entry = dupl_schema_owners.entry(user.database.clone()).or_insert(0);
462+
*entry += 1;
463+
464+
if *entry > 1 {
465+
warn!(
466+
r#"database "{}" has duplicate schema owner "{}", ignoring setting"#,
467+
user.database, user.user
468+
);
469+
cluster.toggle_schema_admin(false);
470+
}
471+
}
472+
}
473+
446474
let mut mirrors = HashMap::new();
447475

448476
for cluster in databases.values() {

pgdog/src/backend/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ pub enum Error {
4848
#[error("no cluster connected")]
4949
NoCluster,
5050

51+
#[error("database \"{0}\" has no schema owner")]
52+
NoSchemaOwner(String),
53+
5154
#[error("{0}")]
5255
ScramAuth(#[from] crate::auth::scram::Error),
5356

pgdog/src/backend/pool/cluster.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub struct Cluster {
4545
multi_tenant: Option<MultiTenant>,
4646
rw_strategy: ReadWriteStrategy,
4747
rw_split: ReadWriteSplit,
48+
schema_admin: bool,
4849
}
4950

5051
/// Sharding configuration from the cluster.
@@ -81,6 +82,7 @@ pub struct ClusterConfig<'a> {
8182
pub multi_tenant: &'a Option<MultiTenant>,
8283
pub rw_strategy: ReadWriteStrategy,
8384
pub rw_split: ReadWriteSplit,
85+
pub schema_admin: bool,
8486
}
8587

8688
impl<'a> ClusterConfig<'a> {
@@ -105,6 +107,7 @@ impl<'a> ClusterConfig<'a> {
105107
multi_tenant,
106108
rw_strategy: general.read_write_strategy,
107109
rw_split: general.read_write_split,
110+
schema_admin: user.schema_admin,
108111
}
109112
}
110113
}
@@ -125,6 +128,7 @@ impl Cluster {
125128
multi_tenant,
126129
rw_strategy,
127130
rw_split,
131+
schema_admin,
128132
} = config;
129133

130134
Self {
@@ -143,6 +147,7 @@ impl Cluster {
143147
multi_tenant: multi_tenant.clone(),
144148
rw_strategy,
145149
rw_split,
150+
schema_admin,
146151
}
147152
}
148153

@@ -193,6 +198,7 @@ impl Cluster {
193198
multi_tenant: self.multi_tenant.clone(),
194199
rw_strategy: self.rw_strategy,
195200
rw_split: self.rw_split,
201+
schema_admin: self.schema_admin,
196202
}
197203
}
198204

@@ -267,6 +273,16 @@ impl Cluster {
267273
true
268274
}
269275

276+
/// This database/user pair is responsible for schema management.
277+
pub fn schema_admin(&self) -> bool {
278+
self.schema_admin
279+
}
280+
281+
/// Change schema owner attribute.
282+
pub fn toggle_schema_admin(&mut self, owner: bool) {
283+
self.schema_admin = owner;
284+
}
285+
270286
/// We'll need the query router to figure out
271287
/// where a query should go.
272288
pub fn router_needed(&self) -> bool {

pgdog/src/backend/schema/setup.sql

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,25 @@ GRANT USAGE ON SEQUENCE pgdog.validator_bigint_id_seq TO PUBLIC;
4949

5050
-- Generate a primary key from a sequence that will
5151
-- match the shard number this is ran on.
52-
CREATE OR REPLACE FUNCTION pgdog.next_id_auto() RETURNS BIGINT AS $body$
52+
CREATE OR REPLACE FUNCTION pgdog.next_id_seq(sequence_name regclass) RETURNS BIGINT AS $body$
5353
DECLARE next_value BIGINT;
5454
DECLARE seq_oid oid;
5555
DECLARE table_oid oid;
5656
DECLARE shards INTEGER;
5757
DECLARE shard INTEGER;
5858
BEGIN
59-
SELECT 'pgdog.validator_bigint_id_seq'::regclass INTO seq_oid;
59+
SELECT sequence_name INTO seq_oid;
6060
SELECT 'pgdog.validator_bigint'::regclass INTO table_oid;
6161
SELECT
6262
pgdog.config.shard,
6363
pgdog.config.shards
6464
INTO shard, shards
6565
FROM pgdog.config;
6666

67+
IF shards IS NULL OR shard IS NULL THEN
68+
RAISE EXCEPTION 'pgdog.config not set';
69+
END IF;
70+
6771
LOOP
6872
-- This is atomic.
6973
SELECT nextval(seq_oid) INTO next_value;
@@ -75,6 +79,12 @@ BEGIN
7579
END;
7680
$body$ LANGUAGE plpgsql;
7781

82+
CREATE OR REPLACE FUNCTION pgdog.next_id_auto() RETURNS BIGINT AS $body$
83+
BEGIN
84+
RETURN pgdog.next_id_seq('pgdog.validator_bigint_id_seq'::regclass);
85+
END;
86+
$body$ LANGUAGE plpgsql;
87+
7888
-- Generate a primary key from a sequence that will
7989
-- match the shard number this is ran on.
8090
CREATE OR REPLACE FUNCTION pgdog.next_uuid_auto() RETURNS UUID AS $body$
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
//! Shard configuration sync.
2+
3+
use tracing::info;
4+
5+
use crate::backend::pool::Request;
6+
use crate::backend::Cluster;
7+
use crate::backend::Error;
8+
use crate::backend::Pool;
9+
use crate::backend::Schema;
10+
11+
pub struct ShardConfig {
12+
shard: usize,
13+
shards: usize,
14+
pool: Pool,
15+
}
16+
17+
impl ShardConfig {
18+
/// Sync schema and set shard config.
19+
pub async fn sync(&self) -> Result<(), Error> {
20+
let mut conn = self.pool.get(&Request::default()).await?;
21+
22+
Schema::setup(&mut conn).await?;
23+
24+
conn.execute("BEGIN").await?;
25+
conn.execute("DELETE FROM pgdog.config").await?;
26+
conn.execute(format!(
27+
"INSERT INTO pgdog.config (shard, shards) VALUES ({}, {})",
28+
self.shard, self.shards
29+
))
30+
.await?;
31+
conn.execute("COMMIT").await?;
32+
33+
Ok(())
34+
}
35+
36+
/// Sync all shards.
37+
pub async fn sync_all(cluster: &Cluster) -> Result<(), Error> {
38+
let shards = cluster.shards().len();
39+
40+
info!("setting up schema on {} shards", shards);
41+
42+
let shards: Vec<_> = cluster
43+
.shards()
44+
.iter()
45+
.filter(|s| s.has_primary())
46+
.map(|s| s.pools().first().unwrap().clone())
47+
.enumerate()
48+
.map(|(shard, pool)| ShardConfig {
49+
pool,
50+
shard,
51+
shards,
52+
})
53+
.collect();
54+
55+
for shard in &shards {
56+
shard.sync().await?;
57+
}
58+
59+
info!("schema setup complete for {} shards", shards.len());
60+
61+
Ok(())
62+
}
63+
}

pgdog/src/backend/schema/sync/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod config;
12
pub mod error;
23
pub mod pg_dump;
34
pub mod progress;

pgdog/src/cli.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use thiserror::Error;
77
use tokio::{select, signal::ctrl_c};
88
use tracing::error;
99

10+
use crate::backend::schema::sync::config::ShardConfig;
1011
use crate::backend::schema::sync::pg_dump::{PgDump, SyncState};
1112
use crate::backend::{databases::databases, replication::logical::Publisher};
1213
use crate::config::{Config, Users};
@@ -94,19 +95,13 @@ pub enum Commands {
9495
/// Source database name.
9596
#[arg(long)]
9697
from_database: String,
97-
/// Source user name.
98-
#[arg(long)]
99-
from_user: String,
10098
/// Publication name.
10199
#[arg(long)]
102100
publication: String,
103101

104102
/// Destination database.
105103
#[arg(long)]
106104
to_database: String,
107-
/// Destination user name.
108-
#[arg(long)]
109-
to_user: String,
110105

111106
/// Dry run. Print schema commands, don't actually execute them.
112107
#[arg(long)]
@@ -120,6 +115,14 @@ pub enum Commands {
120115
#[arg(long)]
121116
data_sync_complete: bool,
122117
},
118+
119+
/// Perform cluster configuration steps
120+
/// required for sharded operations.
121+
Setup {
122+
/// Database name.
123+
#[arg(long)]
124+
database: String,
125+
},
123126
}
124127

125128
/// Fingerprint some queries.
@@ -251,17 +254,15 @@ pub async fn schema_sync(commands: Commands) -> Result<(), Box<dyn std::error::E
251254
let (source, destination, publication, dry_run, ignore_errors, data_sync_complete) =
252255
if let Commands::SchemaSync {
253256
from_database,
254-
from_user,
255257
to_database,
256-
to_user,
257258
publication,
258259
dry_run,
259260
ignore_errors,
260261
data_sync_complete,
261262
} = commands
262263
{
263-
let source = databases().cluster((from_user.as_str(), from_database.as_str()))?;
264-
let dest = databases().cluster((to_user.as_str(), to_database.as_str()))?;
264+
let source = databases().schema_owner(&from_database)?;
265+
let dest = databases().schema_owner(&to_database)?;
265266

266267
(
267268
source,
@@ -283,6 +284,10 @@ pub async fn schema_sync(commands: Commands) -> Result<(), Box<dyn std::error::E
283284
SyncState::PreData
284285
};
285286

287+
if state == SyncState::PreData {
288+
ShardConfig::sync_all(&destination).await?;
289+
}
290+
286291
for output in output {
287292
if dry_run {
288293
let queries = output.statements(state)?;
@@ -296,3 +301,12 @@ pub async fn schema_sync(commands: Commands) -> Result<(), Box<dyn std::error::E
296301

297302
Ok(())
298303
}
304+
305+
pub async fn setup(database: &str) -> Result<(), Box<dyn std::error::Error>> {
306+
let databases = databases();
307+
let schema_owner = databases.schema_owner(database)?;
308+
309+
ShardConfig::sync_all(&schema_owner).await?;
310+
311+
Ok(())
312+
}

pgdog/src/config/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,9 @@ pub struct User {
954954
pub idle_timeout: Option<u64>,
955955
/// Read-only mode.
956956
pub read_only: Option<bool>,
957+
/// Schema owner.
958+
#[serde(default)]
959+
pub schema_admin: bool,
957960
}
958961

959962
impl User {

0 commit comments

Comments
 (0)