Skip to content

Commit 9358fe3

Browse files
authored
Primary key check (#573)
1 parent b55c951 commit 9358fe3

File tree

4 files changed

+33
-3
lines changed

4 files changed

+33
-3
lines changed

pgdog/src/backend/replication/logical/error.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::num::ParseIntError;
22

33
use thiserror::Error;
44

5-
use crate::net::ErrorResponse;
5+
use crate::{backend::replication::publisher::PublicationTable, net::ErrorResponse};
66

77
#[derive(Debug, Error)]
88
pub enum Error {
@@ -65,6 +65,9 @@ pub enum Error {
6565

6666
#[error("no replicas available for table sync")]
6767
NoReplicas,
68+
69+
#[error("table {0} doesn't have a primary key")]
70+
NoPrimaryKey(PublicationTable),
6871
}
6972

7073
impl From<ErrorResponse> for Error {

pgdog/src/backend/replication/logical/publisher/queries.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
//! TODO: I think these are Postgres-version specific, so we need to handle that
44
//! later. These were fetched from CREATE SUBSCRIPTION ran on Postgres 17.
55
//!
6+
use std::fmt::Display;
7+
68
use crate::{
79
backend::Server,
810
net::{DataRow, Format},
@@ -28,6 +30,12 @@ pub struct PublicationTable {
2830
pub attributes: String,
2931
}
3032

33+
impl Display for PublicationTable {
34+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35+
write!(f, "\"{}\".\"{}\"", self.schema, self.name)
36+
}
37+
}
38+
3139
impl PublicationTable {
3240
pub async fn load(
3341
publication: &str,

pgdog/src/backend/replication/logical/publisher/table.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ impl Table {
4949
Ok(results)
5050
}
5151

52+
/// Check that the table supports replication.
53+
pub fn valid(&self) -> Result<(), Error> {
54+
if !self.columns.iter().any(|c| c.identity) {
55+
return Err(Error::NoPrimaryKey(self.table.clone()));
56+
}
57+
58+
Ok(())
59+
}
60+
5261
/// Upsert record into table.
5362
pub fn insert(&self, upsert: bool) -> String {
5463
let names = format!(

pgdog/src/backend/replication/logical/subscriber/stream.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use pg_query::{
1313
protobuf::{InsertStmt, ParseResult},
1414
NodeEnum,
1515
};
16-
use tracing::trace;
16+
use tracing::{debug, trace};
1717

1818
use super::super::{publisher::Table, Error};
1919
use crate::{
@@ -95,8 +95,11 @@ impl Statement {
9595
.flatten()
9696
.flatten()
9797
}
98-
}
9998

99+
fn query(&self) -> &str {
100+
&self.query
101+
}
102+
}
100103
#[derive(Debug)]
101104
pub struct StreamSubscriber {
102105
/// Destination cluster.
@@ -303,10 +306,17 @@ impl StreamSubscriber {
303306
if let Some(table) = table {
304307
// Prepare queries for this table. Prepared statements
305308
// are much faster.
309+
310+
table.valid()?;
311+
306312
let insert = Statement::new(&table.insert(false))?;
307313
let upsert = Statement::new(&table.insert(true))?;
308314

309315
for server in &mut self.connections {
316+
for stmt in &[&insert, &upsert] {
317+
debug!("preparing \"{}\" [{}]", stmt.query(), server.addr());
318+
}
319+
310320
server
311321
.send(&vec![insert.parse().into(), upsert.parse().into(), Sync.into()].into())
312322
.await?;

0 commit comments

Comments
 (0)