Skip to content

Commit e52b52a

Browse files
authored
Merge pull request #2634 from input-output-hk/djo/2534/aggregator-sync-master_follower
feat: Add `CertificateChainSynchronizer` and make follower aggregators start their chain by synchronising with their leader
2 parents b7a272c + 11c8aec commit e52b52a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2381
-543
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ As a minor extension, we have adopted a slightly different versioning convention
3232
- **UNSTABLE** :
3333
- Support for DMQ signature publisher in the signer and signature consumer in the aggregator.
3434

35+
- Implement automatic certificates chain synchronization between leader/follower aggregators.
36+
3537
- Crates versions:
3638

3739
| Crate | Version |

Cargo.lock

Lines changed: 5 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.7.72"
3+
version = "0.7.73"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }
@@ -76,6 +76,8 @@ zstd = { version = "0.13.3", features = ["zstdmt"] }
7676
tikv-jemallocator = { version = "0.6.0", optional = true }
7777

7878
[dev-dependencies]
79+
axum = { version = "0.8.4", features = ["json"] }
80+
axum-test = "17.3.0"
7981
criterion = { version = "0.6.0", features = ["html_reports", "async_tokio"] }
8082
http = "1.3.1"
8183
httpmock = "0.7.0"
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//! Shared `WhereCondition` across certificates queries
2+
3+
use sqlite::Value;
4+
use std::iter::repeat_n;
5+
6+
use mithril_persistence::sqlite::WhereCondition;
7+
8+
use crate::database::record::CertificateRecord;
9+
10+
pub(super) fn insert_many(certificates_records: Vec<CertificateRecord>) -> WhereCondition {
11+
let columns = "(\
12+
certificate_id, \
13+
parent_certificate_id, \
14+
message, \
15+
signature, \
16+
aggregate_verification_key, \
17+
epoch, \
18+
network, \
19+
signed_entity_type_id, \
20+
signed_entity_beacon, \
21+
protocol_version, \
22+
protocol_parameters, \
23+
protocol_message, \
24+
signers, \
25+
initiated_at, \
26+
sealed_at)";
27+
let values_columns: Vec<&str> = repeat_n(
28+
"(?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*)",
29+
certificates_records.len(),
30+
)
31+
.collect();
32+
33+
let values: Vec<Value> = certificates_records
34+
.into_iter()
35+
.flat_map(|certificate_record| {
36+
vec![
37+
Value::String(certificate_record.certificate_id),
38+
match certificate_record.parent_certificate_id {
39+
Some(parent_certificate_id) => Value::String(parent_certificate_id),
40+
None => Value::Null,
41+
},
42+
Value::String(certificate_record.message),
43+
Value::String(certificate_record.signature),
44+
Value::String(certificate_record.aggregate_verification_key),
45+
Value::Integer(certificate_record.epoch.try_into().unwrap()),
46+
Value::String(certificate_record.network),
47+
Value::Integer(certificate_record.signed_entity_type.index() as i64),
48+
Value::String(certificate_record.signed_entity_type.get_json_beacon().unwrap()),
49+
Value::String(certificate_record.protocol_version),
50+
Value::String(
51+
serde_json::to_string(&certificate_record.protocol_parameters).unwrap(),
52+
),
53+
Value::String(serde_json::to_string(&certificate_record.protocol_message).unwrap()),
54+
Value::String(serde_json::to_string(&certificate_record.signers).unwrap()),
55+
Value::String(certificate_record.initiated_at.to_rfc3339()),
56+
Value::String(certificate_record.sealed_at.to_rfc3339()),
57+
]
58+
})
59+
.collect();
60+
61+
WhereCondition::new(
62+
format!("{columns} values {}", values_columns.join(", ")).as_str(),
63+
values,
64+
)
65+
}

mithril-aggregator/src/database/query/certificate/insert_certificate.rs

Lines changed: 5 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
use std::iter::repeat_n;
2-
3-
use sqlite::Value;
4-
51
use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};
62

73
use crate::database::record::CertificateRecord;
84

5+
use super::conditions;
6+
97
/// Query to insert [CertificateRecord] in the sqlite database
108
pub struct InsertCertificateRecordQuery {
119
condition: WhereCondition,
@@ -17,64 +15,9 @@ impl InsertCertificateRecordQuery {
1715
}
1816

1917
pub fn many(certificates_records: Vec<CertificateRecord>) -> Self {
20-
let columns = "(\
21-
certificate_id, \
22-
parent_certificate_id, \
23-
message, \
24-
signature, \
25-
aggregate_verification_key, \
26-
epoch, \
27-
network, \
28-
signed_entity_type_id, \
29-
signed_entity_beacon, \
30-
protocol_version, \
31-
protocol_parameters, \
32-
protocol_message, \
33-
signers, \
34-
initiated_at, \
35-
sealed_at)";
36-
let values_columns: Vec<&str> = repeat_n(
37-
"(?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*, ?*)",
38-
certificates_records.len(),
39-
)
40-
.collect();
41-
42-
let values: Vec<Value> = certificates_records
43-
.into_iter()
44-
.flat_map(|certificate_record| {
45-
vec![
46-
Value::String(certificate_record.certificate_id),
47-
match certificate_record.parent_certificate_id {
48-
Some(parent_certificate_id) => Value::String(parent_certificate_id),
49-
None => Value::Null,
50-
},
51-
Value::String(certificate_record.message),
52-
Value::String(certificate_record.signature),
53-
Value::String(certificate_record.aggregate_verification_key),
54-
Value::Integer(certificate_record.epoch.try_into().unwrap()),
55-
Value::String(certificate_record.network),
56-
Value::Integer(certificate_record.signed_entity_type.index() as i64),
57-
Value::String(certificate_record.signed_entity_type.get_json_beacon().unwrap()),
58-
Value::String(certificate_record.protocol_version),
59-
Value::String(
60-
serde_json::to_string(&certificate_record.protocol_parameters).unwrap(),
61-
),
62-
Value::String(
63-
serde_json::to_string(&certificate_record.protocol_message).unwrap(),
64-
),
65-
Value::String(serde_json::to_string(&certificate_record.signers).unwrap()),
66-
Value::String(certificate_record.initiated_at.to_rfc3339()),
67-
Value::String(certificate_record.sealed_at.to_rfc3339()),
68-
]
69-
})
70-
.collect();
71-
72-
let condition = WhereCondition::new(
73-
format!("{columns} values {}", values_columns.join(", ")).as_str(),
74-
values,
75-
);
76-
77-
Self { condition }
18+
Self {
19+
condition: conditions::insert_many(certificates_records),
20+
}
7821
}
7922
}
8023

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};
2+
3+
use crate::database::record::CertificateRecord;
4+
5+
use super::conditions;
6+
7+
/// Query to insert or replace [CertificateRecord] in the sqlite database
8+
pub struct InsertOrReplaceCertificateRecordQuery {
9+
condition: WhereCondition,
10+
}
11+
12+
impl InsertOrReplaceCertificateRecordQuery {
13+
pub fn many(certificates_records: Vec<CertificateRecord>) -> Self {
14+
Self {
15+
condition: conditions::insert_many(certificates_records),
16+
}
17+
}
18+
}
19+
20+
impl Query for InsertOrReplaceCertificateRecordQuery {
21+
type Entity = CertificateRecord;
22+
23+
fn filters(&self) -> WhereCondition {
24+
self.condition.clone()
25+
}
26+
27+
fn get_definition(&self, condition: &str) -> String {
28+
// it is important to alias the fields with the same name as the table
29+
// since the table cannot be aliased in a RETURNING statement in SQLite.
30+
let projection = Self::Entity::get_projection()
31+
.expand(SourceAlias::new(&[("{:certificate:}", "certificate")]));
32+
33+
format!("insert or replace into certificate {condition} returning {projection}")
34+
}
35+
}
36+
37+
#[cfg(test)]
38+
mod tests {
39+
use std::collections::HashMap;
40+
41+
use mithril_common::crypto_helper::tests_setup::setup_certificate_chain;
42+
use mithril_common::entities::Epoch;
43+
use mithril_common::test_utils::fake_data;
44+
use mithril_persistence::sqlite::ConnectionExtensions;
45+
46+
use crate::database::query::{GetCertificateRecordQuery, InsertCertificateRecordQuery};
47+
use crate::database::test_helper::main_db_connection;
48+
49+
use super::*;
50+
51+
#[test]
52+
fn test_insert_many_certificates_records_in_empty_db() {
53+
let certificates = setup_certificate_chain(5, 2);
54+
let certificates_records: Vec<CertificateRecord> = certificates.into();
55+
56+
let connection = main_db_connection().unwrap();
57+
58+
let certificates_records_saved: Vec<CertificateRecord> = connection
59+
.fetch_collect(InsertOrReplaceCertificateRecordQuery::many(
60+
certificates_records.clone(),
61+
))
62+
.expect("saving many records should not fail");
63+
64+
assert_eq!(certificates_records, certificates_records_saved);
65+
66+
// Check insertion order
67+
let all_records: Vec<CertificateRecord> =
68+
connection.fetch_collect(GetCertificateRecordQuery::all()).unwrap();
69+
assert_eq!(
70+
certificates_records.into_iter().rev().collect::<Vec<_>>(),
71+
all_records
72+
);
73+
}
74+
75+
#[test]
76+
fn test_replace_one_certificate_record() {
77+
let certificate_record = CertificateRecord {
78+
epoch: Epoch(12),
79+
..fake_data::certificate("hash").into()
80+
};
81+
82+
let connection = main_db_connection().unwrap();
83+
let certificate_record_saved = connection
84+
.fetch_first(InsertCertificateRecordQuery::one(
85+
certificate_record.clone(),
86+
))
87+
.unwrap();
88+
assert_eq!(Some(Epoch(12)), certificate_record_saved.map(|r| r.epoch));
89+
90+
let modified_certificate_record = CertificateRecord {
91+
epoch: Epoch(23),
92+
..certificate_record
93+
};
94+
let certificate_record_saved = connection
95+
.fetch_first(InsertOrReplaceCertificateRecordQuery::many(vec![
96+
modified_certificate_record.clone(),
97+
]))
98+
.unwrap();
99+
assert_eq!(Some(Epoch(23)), certificate_record_saved.map(|r| r.epoch));
100+
101+
let all_records_cursor = connection.fetch(GetCertificateRecordQuery::all()).unwrap();
102+
assert_eq!(1, all_records_cursor.count());
103+
}
104+
105+
#[test]
106+
fn test_insert_and_replace_many_certificate_record() {
107+
let tested_records: HashMap<_, CertificateRecord> = HashMap::from([
108+
(
109+
"cert1-genesis",
110+
fake_data::genesis_certificate("genesis").into(),
111+
),
112+
("cert2", fake_data::certificate("cert2").into()),
113+
(
114+
"cert2-modified",
115+
CertificateRecord {
116+
epoch: Epoch(14),
117+
..fake_data::certificate("cert2").into()
118+
},
119+
),
120+
("cert3", fake_data::certificate("cert3").into()),
121+
("cert4", fake_data::certificate("cert4").into()),
122+
(
123+
"cert4-modified",
124+
CertificateRecord {
125+
epoch: Epoch(32),
126+
..fake_data::certificate("cert4").into()
127+
},
128+
),
129+
("cert5", fake_data::certificate("cert5").into()),
130+
]);
131+
let connection = main_db_connection().unwrap();
132+
133+
let cursor = connection
134+
.fetch(InsertCertificateRecordQuery::many(vec![
135+
tested_records["cert1-genesis"].clone(),
136+
tested_records["cert2"].clone(),
137+
tested_records["cert3"].clone(),
138+
tested_records["cert4"].clone(),
139+
tested_records["cert5"].clone(),
140+
]))
141+
.unwrap();
142+
assert_eq!(5, cursor.count());
143+
144+
let cursor = connection
145+
.fetch(InsertOrReplaceCertificateRecordQuery::many(vec![
146+
tested_records["cert1-genesis"].clone(),
147+
tested_records["cert2-modified"].clone(),
148+
tested_records["cert3"].clone(),
149+
tested_records["cert4-modified"].clone(),
150+
]))
151+
.unwrap();
152+
assert_eq!(4, cursor.count());
153+
154+
let all_records: Vec<CertificateRecord> =
155+
connection.fetch_collect(GetCertificateRecordQuery::all()).unwrap();
156+
assert_eq!(5, all_records.len());
157+
assert_eq!(
158+
all_records,
159+
vec![
160+
tested_records["cert4-modified"].clone(),
161+
tested_records["cert3"].clone(),
162+
tested_records["cert2-modified"].clone(),
163+
tested_records["cert1-genesis"].clone(),
164+
// Since the cert5 was not in the Insert/replace query, it now has a lower rowid and shows first
165+
tested_records["cert5"].clone(),
166+
]
167+
);
168+
}
169+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
mod conditions;
12
mod delete_certificate;
23
mod get_certificate;
34
mod get_master_certificate;
45
mod insert_certificate;
6+
mod insert_or_replace_certificate;
57

68
pub use delete_certificate::*;
79
pub use get_certificate::*;
810
pub use get_master_certificate::*;
911
pub use insert_certificate::*;
12+
pub use insert_or_replace_certificate::*;

0 commit comments

Comments
 (0)