Skip to content

Commit 08e8dd5

Browse files
committed
feat: add MatchSeq to assert seq
`MatchSeq` specifies the sequence number condition that an operation must satisfy to take effect. In distributed systems, each value stored in the system has an associated sequence number (`seq`) that represents its version. `MatchSeq` provides a way to express conditional operations based on these sequence numbers: - Match any sequence number (unconditional operation) - Match an exact sequence number (compare-and-swap operations) - Match sequence numbers greater than or equal to a value (update existing entries) This is essential for implementing optimistic concurrency control and ensuring consistency in distributed environments.
1 parent 1c75c8d commit 08e8dd5

File tree

7 files changed

+292
-1
lines changed

7 files changed

+292
-1
lines changed

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,21 @@ edition = "2021"
1010

1111
[dependencies]
1212
async-trait = { version = "0.1.77" }
13+
deepsize = { version = "0.2.0" }
1314
futures = "0.3.24"
1415
futures-util = "0.3.24"
1516
log = { version = "0.4.21", features = ["serde", "kv_unstable_std"] }
1617
serde = { version = "1.0.164", features = ["derive", "rc"] }
1718
stream-more = { version = "0.1.3" }
19+
thiserror = { version = "1" }
1820

1921
[dev-dependencies]
2022
anyhow = { version = "1.0.65" }
23+
async-trait = { version = "0.1.77" }
2124
pretty_assertions = { version = "1.3.0" }
25+
serde_json = { version = "1.0.85" }
2226
tempfile = { version = "3.4.0" }
2327
tokio = { version = "1.35.0", features = ["full"] }
24-
async-trait = { version = "0.1.77" }
2528

2629
[[example]]
2730
name = "basic_usage"

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ pub mod map_api_ro;
7474
pub mod map_key;
7575
pub mod map_value;
7676
pub mod marked;
77+
pub mod match_seq;
7778
pub mod seq_value;
7879
pub mod util;
7980

src/match_seq/errors.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use serde::Deserialize;
16+
use serde::Serialize;
17+
18+
use crate::match_seq::MatchSeq;
19+
20+
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, thiserror::Error)]
21+
pub enum ConflictSeq {
22+
#[error("ConflictSeq: Want: {want}, Got: {got}")]
23+
NotMatch { want: MatchSeq, got: u64 },
24+
}

src/match_seq/match_seq.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt;
16+
17+
use serde::Deserialize;
18+
use serde::Serialize;
19+
20+
/// Specifies the sequence number condition that an operation must satisfy to take effect.
21+
///
22+
/// In distributed systems, each value stored in the system has an associated sequence number
23+
/// (`seq`) that represents its version. `MatchSeq` provides a way to express conditional
24+
/// operations based on these sequence numbers:
25+
///
26+
/// - Match any sequence number (unconditional operation)
27+
/// - Match an exact sequence number (compare-and-swap operations)
28+
/// - Match sequence numbers greater than or equal to a value (update existing entries)
29+
///
30+
/// This is essential for implementing optimistic concurrency control and ensuring
31+
/// consistency in distributed environments.
32+
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, deepsize::DeepSizeOf)]
33+
pub enum MatchSeq {
34+
// TODO(xp): remove Any, it is equivalent to GE(0)
35+
/// Any value is acceptable, i.e. does not check seq at all.
36+
Any,
37+
38+
/// To match an exact value of seq.
39+
///
40+
/// E.g., CAS updates the exact version of some value,
41+
/// and put-if-absent adds a value only when seq is 0.
42+
Exact(u64),
43+
44+
/// To match a seq that is greater-or-equal some value.
45+
///
46+
/// E.g., GE(1) perform an update on any existent value.
47+
GE(u64),
48+
}
49+
50+
impl fmt::Display for MatchSeq {
51+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52+
match self {
53+
MatchSeq::Any => {
54+
write!(f, "is any value")
55+
}
56+
MatchSeq::Exact(s) => {
57+
write!(f, "== {}", s)
58+
}
59+
MatchSeq::GE(s) => {
60+
write!(f, ">= {}", s)
61+
}
62+
}
63+
}
64+
}
65+
66+
#[cfg(test)]
67+
mod tests {
68+
use crate::match_seq::MatchSeq;
69+
70+
#[derive(serde::Serialize)]
71+
struct Foo {
72+
f: MatchSeq,
73+
}
74+
75+
#[test]
76+
fn test_match_seq_serde() -> anyhow::Result<()> {
77+
//
78+
79+
let t = Foo { f: MatchSeq::Any };
80+
let s = serde_json::to_string(&t)?;
81+
println!("{s}");
82+
83+
Ok(())
84+
}
85+
86+
#[test]
87+
fn test_match_seq_display() -> anyhow::Result<()> {
88+
assert_eq!("== 3", MatchSeq::Exact(3).to_string());
89+
assert_eq!(">= 3", MatchSeq::GE(3).to_string());
90+
91+
Ok(())
92+
}
93+
}

src/match_seq/match_seq_ext.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::match_seq::errors::ConflictSeq;
16+
17+
/// Check if the sequence number satisfies the condition.
18+
pub trait MatchSeqExt<T> {
19+
/// Match against a some value containing seq by checking if the seq satisfies the condition.
20+
fn match_seq(&self, sv: &T) -> Result<(), ConflictSeq>;
21+
}
22+
23+
#[cfg(test)]
24+
mod tests {
25+
use crate::match_seq::errors::ConflictSeq;
26+
use crate::match_seq::MatchSeq;
27+
use crate::match_seq::MatchSeqExt;
28+
29+
type SeqV = crate::seq_value::SeqV<u64, u64>;
30+
31+
#[test]
32+
fn test_match_seq_match_seq_value() -> anyhow::Result<()> {
33+
assert_eq!(MatchSeq::GE(0).match_seq(&Some(SeqV::new(0, 1))), Ok(()));
34+
assert_eq!(MatchSeq::GE(0).match_seq(&Some(SeqV::new(1, 1))), Ok(()));
35+
36+
//
37+
38+
assert_eq!(
39+
MatchSeq::Exact(3).match_seq(&None::<SeqV>),
40+
Err(ConflictSeq::NotMatch {
41+
want: MatchSeq::Exact(3),
42+
got: 0
43+
})
44+
);
45+
assert_eq!(
46+
MatchSeq::Exact(3).match_seq(&Some(SeqV::new(0, 1))),
47+
Err(ConflictSeq::NotMatch {
48+
want: MatchSeq::Exact(3),
49+
got: 0
50+
})
51+
);
52+
assert_eq!(
53+
MatchSeq::Exact(3).match_seq(&Some(SeqV::new(2, 1))),
54+
Err(ConflictSeq::NotMatch {
55+
want: MatchSeq::Exact(3),
56+
got: 2
57+
})
58+
);
59+
assert_eq!(MatchSeq::Exact(3).match_seq(&Some(SeqV::new(3, 1))), Ok(()));
60+
assert_eq!(
61+
MatchSeq::Exact(3).match_seq(&Some(SeqV::new(4, 1))),
62+
Err(ConflictSeq::NotMatch {
63+
want: MatchSeq::Exact(3),
64+
got: 4
65+
})
66+
);
67+
68+
//
69+
70+
assert_eq!(
71+
MatchSeq::GE(3).match_seq(&None::<SeqV>),
72+
Err(ConflictSeq::NotMatch {
73+
want: MatchSeq::GE(3),
74+
got: 0
75+
})
76+
);
77+
assert_eq!(
78+
MatchSeq::GE(3).match_seq(&Some(SeqV::new(0, 1))),
79+
Err(ConflictSeq::NotMatch {
80+
want: MatchSeq::GE(3),
81+
got: 0
82+
})
83+
);
84+
assert_eq!(
85+
MatchSeq::GE(3).match_seq(&Some(SeqV::new(2, 1))),
86+
Err(ConflictSeq::NotMatch {
87+
want: MatchSeq::GE(3),
88+
got: 2
89+
})
90+
);
91+
assert_eq!(MatchSeq::GE(3).match_seq(&Some(SeqV::new(3, 1))), Ok(()));
92+
assert_eq!(MatchSeq::GE(3).match_seq(&Some(SeqV::new(4, 1))), Ok(()));
93+
94+
Ok(())
95+
}
96+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::match_seq::errors::ConflictSeq;
16+
use crate::match_seq::MatchSeq;
17+
use crate::match_seq::MatchSeqExt;
18+
use crate::seq_value::SeqV;
19+
20+
impl MatchSeqExt<u64> for MatchSeq {
21+
fn match_seq(&self, seq: &u64) -> Result<(), ConflictSeq> {
22+
match self {
23+
MatchSeq::Any => Ok(()),
24+
MatchSeq::Exact(s) if seq == s => Ok(()),
25+
MatchSeq::GE(s) if seq >= s => Ok(()),
26+
_ => Err(ConflictSeq::NotMatch {
27+
want: *self,
28+
got: *seq,
29+
}),
30+
}
31+
}
32+
}
33+
34+
impl<M, T> MatchSeqExt<SeqV<M, T>> for MatchSeq {
35+
fn match_seq(&self, sv: &SeqV<M, T>) -> Result<(), ConflictSeq> {
36+
self.match_seq(&sv.seq)
37+
}
38+
}
39+
40+
impl<M, T> MatchSeqExt<Option<&SeqV<M, T>>> for MatchSeq {
41+
fn match_seq(&self, sv: &Option<&SeqV<M, T>>) -> Result<(), ConflictSeq> {
42+
let seq = sv.map_or(0, |sv| sv.seq);
43+
self.match_seq(&seq)
44+
}
45+
}
46+
47+
impl<M, T> MatchSeqExt<Option<SeqV<M, T>>> for MatchSeq {
48+
fn match_seq(&self, sv: &Option<SeqV<M, T>>) -> Result<(), ConflictSeq> {
49+
self.match_seq(&sv.as_ref())
50+
}
51+
}

src/match_seq/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
pub mod errors;
16+
17+
#[allow(lippy::module_inception)]
18+
mod match_seq;
19+
mod match_seq_ext;
20+
mod match_seq_ext_impls;
21+
22+
pub use match_seq::MatchSeq;
23+
pub use match_seq_ext::MatchSeqExt;

0 commit comments

Comments
 (0)