Skip to content

Commit e3c1b3b

Browse files
committed
Add a total order bound on persist
We de facto expect it currently, and it simplifies some logic to be able to require it.
1 parent 25eb775 commit e3c1b3b

File tree

11 files changed

+28
-171
lines changed

11 files changed

+28
-171
lines changed

src/persist-client/src/batch.rs

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,7 +1414,6 @@ fn diffs_sum<D: Semigroup + Codec64>(updates: &Int64Array) -> Option<D> {
14141414
#[cfg(test)]
14151415
mod tests {
14161416
use mz_dyncfg::ConfigUpdates;
1417-
use timely::order::Product;
14181417

14191418
use super::*;
14201419
use crate::PersistLocation;
@@ -1596,50 +1595,6 @@ mod tests {
15961595
assert!(part_bytes.is_none());
15971596
}
15981597

1599-
#[mz_ore::test(tokio::test)]
1600-
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1601-
async fn batch_builder_partial_order() {
1602-
let cache = PersistClientCache::new_no_metrics();
1603-
// Set blob_target_size to 0 so that each row gets forced into its own batch part
1604-
cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
1605-
// Otherwise fails: expected hollow part!
1606-
cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
1607-
cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
1608-
cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
1609-
let client = cache
1610-
.open(PersistLocation::new_in_mem())
1611-
.await
1612-
.expect("client construction failed");
1613-
let shard_id = ShardId::new();
1614-
let (mut write, _) = client
1615-
.expect_open::<String, String, Product<u32, u32>, i64>(shard_id)
1616-
.await;
1617-
1618-
let batch = write
1619-
.batch(
1620-
&[
1621-
(("1".to_owned(), "one".to_owned()), Product::new(0, 10), 1),
1622-
(("2".to_owned(), "two".to_owned()), Product::new(10, 0), 1),
1623-
],
1624-
Antichain::from_elem(Product::new(0, 0)),
1625-
Antichain::from_iter([Product::new(0, 11), Product::new(10, 1)]),
1626-
)
1627-
.await
1628-
.expect("invalid usage");
1629-
1630-
assert_eq!(batch.batch.part_count(), 2);
1631-
for part in &batch.batch.parts {
1632-
let part = part.expect_hollow_part();
1633-
match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
1634-
Ok((shard, PartialBlobKey::Batch(writer, _))) => {
1635-
assert_eq!(shard.to_string(), shard_id.to_string());
1636-
assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
1637-
}
1638-
_ => panic!("unparseable blob key"),
1639-
}
1640-
}
1641-
}
1642-
16431598
#[mz_ore::test]
16441599
fn untrimmable_columns() {
16451600
let untrimmable = UntrimmableColumns {

src/persist-client/src/internal/compact.rs

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,6 @@ mod tests {
942942
use mz_dyncfg::ConfigUpdates;
943943
use mz_ore::{assert_contains, assert_err};
944944
use mz_persist_types::codec_impls::StringSchema;
945-
use timely::order::Product;
946945
use timely::progress::Antichain;
947946

948947
use crate::PersistLocation;
@@ -1021,83 +1020,6 @@ mod tests {
10211020
assert_eq!(updates, all_ok(&data, 10));
10221021
}
10231022

1024-
#[mz_persist_proc::test(tokio::test)]
1025-
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1026-
async fn compaction_partial_order(dyncfgs: ConfigUpdates) {
1027-
let data = vec![
1028-
(("0".to_owned(), "zero".to_owned()), Product::new(0, 10), 1),
1029-
(("1".to_owned(), "one".to_owned()), Product::new(10, 0), 1),
1030-
];
1031-
1032-
let cache = new_test_client_cache(&dyncfgs);
1033-
cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1034-
let (mut write, _) = cache
1035-
.open(PersistLocation::new_in_mem())
1036-
.await
1037-
.expect("client construction failed")
1038-
.expect_open::<String, String, Product<u32, u32>, i64>(ShardId::new())
1039-
.await;
1040-
let b0 = write
1041-
.batch(
1042-
&data[..1],
1043-
Antichain::from_elem(Product::new(0, 0)),
1044-
Antichain::from_iter([Product::new(0, 11), Product::new(10, 0)]),
1045-
)
1046-
.await
1047-
.expect("invalid usage")
1048-
.into_hollow_batch();
1049-
1050-
let b1 = write
1051-
.batch(
1052-
&data[1..],
1053-
Antichain::from_iter([Product::new(0, 11), Product::new(10, 0)]),
1054-
Antichain::from_elem(Product::new(10, 1)),
1055-
)
1056-
.await
1057-
.expect("invalid usage")
1058-
.into_hollow_batch();
1059-
1060-
let req = CompactReq {
1061-
shard_id: write.machine.shard_id(),
1062-
desc: Description::new(
1063-
b0.desc.lower().clone(),
1064-
b1.desc.upper().clone(),
1065-
Antichain::from_elem(Product::new(10, 0)),
1066-
),
1067-
inputs: vec![b0, b1],
1068-
};
1069-
let schemas = Schemas {
1070-
id: None,
1071-
key: Arc::new(StringSchema),
1072-
val: Arc::new(StringSchema),
1073-
};
1074-
let res = Compactor::<String, String, Product<u32, u32>, i64>::compact(
1075-
CompactConfig::new(&write.cfg, write.shard_id()),
1076-
Arc::clone(&write.blob),
1077-
Arc::clone(&write.metrics),
1078-
write.metrics.shards.shard(&write.machine.shard_id(), ""),
1079-
Arc::new(IsolatedRuntime::default()),
1080-
req.clone(),
1081-
schemas.clone(),
1082-
)
1083-
.await
1084-
.expect("compaction failed");
1085-
1086-
assert_eq!(res.output.desc, req.desc);
1087-
assert_eq!(res.output.len, 2);
1088-
assert_eq!(res.output.part_count(), 1);
1089-
let part = res.output.parts[0].expect_hollow_part();
1090-
let (part, updates) = expect_fetch_part(
1091-
write.blob.as_ref(),
1092-
&part.key.complete(&write.machine.shard_id()),
1093-
&write.metrics,
1094-
&schemas,
1095-
)
1096-
.await;
1097-
assert_eq!(part.desc, res.output.desc);
1098-
assert_eq!(updates, all_ok(&data, Product::new(10, 0)));
1099-
}
1100-
11011023
#[mz_persist_proc::test(tokio::test)]
11021024
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
11031025
async fn disable_compaction(dyncfgs: ConfigUpdates) {

src/persist-client/src/internal/state_diff.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,9 +1033,6 @@ fn sniff_compaction<'a, T: Timestamp + Lattice>(
10331033
///
10341034
/// This can only happen when the batch needing to be split is empty, so error
10351035
/// out if it isn't because that means something unexpected is going on.
1036-
///
1037-
/// TODO: This implementation is certainly not correct if T is actually only
1038-
/// partially ordered.
10391036
fn apply_compaction_lenient<'a, T: Timestamp + Lattice>(
10401037
metrics: &Metrics,
10411038
mut trace: Vec<HollowBatch<T>>,

src/persist-client/src/lib.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use mz_persist_types::schema::SchemaId;
3030
use mz_persist_types::{Codec, Codec64, Opaque};
3131
use mz_proto::{IntoRustIfSome, ProtoType};
3232
use semver::Version;
33+
use timely::order::TotalOrder;
3334
use timely::progress::{Antichain, Timestamp};
3435

3536
use crate::async_runtime::IsolatedRuntime;
@@ -290,7 +291,7 @@ impl PersistClient {
290291
where
291292
K: Debug + Codec,
292293
V: Debug + Codec,
293-
T: Timestamp + Lattice + Codec64 + Sync,
294+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
294295
D: Semigroup + Ord + Codec64 + Send + Sync,
295296
{
296297
Ok((
@@ -332,7 +333,7 @@ impl PersistClient {
332333
where
333334
K: Debug + Codec,
334335
V: Debug + Codec,
335-
T: Timestamp + Lattice + Codec64 + Sync,
336+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
336337
D: Semigroup + Codec64 + Send + Sync,
337338
{
338339
let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
@@ -504,7 +505,7 @@ impl PersistClient {
504505
where
505506
K: Debug + Codec,
506507
V: Debug + Codec,
507-
T: Timestamp + Lattice + Codec64 + Sync,
508+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
508509
D: Semigroup + Ord + Codec64 + Send + Sync,
509510
{
510511
let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
@@ -566,7 +567,7 @@ impl PersistClient {
566567
where
567568
K: Debug + Codec,
568569
V: Debug + Codec,
569-
T: Timestamp + Lattice + Codec64 + Sync,
570+
T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
570571
D: Semigroup + Ord + Codec64 + Send + Sync,
571572
{
572573
WriteHandle::builder_inner(
@@ -652,7 +653,7 @@ impl PersistClient {
652653
where
653654
K: Debug + Codec + Ord,
654655
V: Debug + Codec + Ord,
655-
T: Timestamp + Lattice + Codec64 + Sync,
656+
T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
656657
D: Semigroup + Ord + Codec64 + Send + Sync,
657658
{
658659
let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");
@@ -839,7 +840,7 @@ impl PersistClient {
839840
where
840841
K: Debug + Codec,
841842
V: Debug + Codec,
842-
T: Timestamp + Lattice + Codec64 + Sync,
843+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
843844
D: Semigroup + Ord + Codec64 + Send + Sync,
844845
K::Schema: Default,
845846
V::Schema: Default,

src/persist-client/src/read.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use mz_persist_types::{Codec, Codec64};
3333
use proptest_derive::Arbitrary;
3434
use serde::{Deserialize, Serialize};
3535
use timely::PartialOrder;
36+
use timely::order::TotalOrder;
3637
use timely::progress::{Antichain, Timestamp};
3738
use tokio::runtime::Handle;
3839
use tracing::{Instrument, debug_span, warn};
@@ -113,7 +114,7 @@ impl<K, V, T, D> Subscribe<K, V, T, D>
113114
where
114115
K: Debug + Codec,
115116
V: Debug + Codec,
116-
T: Timestamp + Lattice + Codec64 + Sync,
117+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
117118
D: Semigroup + Codec64 + Send + Sync,
118119
{
119120
fn new(snapshot_parts: Vec<LeasedBatchPart<T>>, listen: Listen<K, V, T, D>) -> Self {
@@ -150,7 +151,7 @@ impl<K, V, T, D> Subscribe<K, V, T, D>
150151
where
151152
K: Debug + Codec,
152153
V: Debug + Codec,
153-
T: Timestamp + Lattice + Codec64 + Sync,
154+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
154155
D: Semigroup + Codec64 + Send + Sync,
155156
{
156157
/// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`],
@@ -195,7 +196,7 @@ impl<K, V, T, D> Subscribe<K, V, T, D>
195196
where
196197
K: Debug + Codec,
197198
V: Debug + Codec,
198-
T: Timestamp + Lattice + Codec64 + Sync,
199+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
199200
D: Semigroup + Codec64 + Send + Sync,
200201
{
201202
/// Politely expires this subscribe, releasing its lease.
@@ -238,7 +239,7 @@ impl<K, V, T, D> Listen<K, V, T, D>
238239
where
239240
K: Debug + Codec,
240241
V: Debug + Codec,
241-
T: Timestamp + Lattice + Codec64 + Sync,
242+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
242243
D: Semigroup + Codec64 + Send + Sync,
243244
{
244245
async fn new(mut handle: ReadHandle<K, V, T, D>, as_of: Antichain<T>) -> Self {
@@ -328,8 +329,7 @@ where
328329
// batch the last time we called this) that are strictly less_than the
329330
// batch upper to compute a new since. For totally ordered times
330331
// (currently always the case in mz) self.frontier will always have a
331-
// single element and it will be less_than upper, but the following
332-
// logic is (hopefully) correct for partially order times as well. We
332+
// single element and it will be less_than upper. We
333333
// could also abuse the fact that every time we actually emit is
334334
// guaranteed by definition to be less_than upper to be a bit more
335335
// prompt, but this would involve a lot more temporary antichains and
@@ -365,7 +365,7 @@ impl<K, V, T, D> Listen<K, V, T, D>
365365
where
366366
K: Debug + Codec,
367367
V: Debug + Codec,
368-
T: Timestamp + Lattice + Codec64 + Sync,
368+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
369369
D: Semigroup + Codec64 + Send + Sync,
370370
{
371371
/// Attempt to pull out the next values of this subscription.
@@ -440,7 +440,7 @@ impl<K, V, T, D> Listen<K, V, T, D>
440440
where
441441
K: Debug + Codec,
442442
V: Debug + Codec,
443-
T: Timestamp + Lattice + Codec64 + Sync,
443+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
444444
D: Semigroup + Codec64 + Send + Sync,
445445
{
446446
/// Fetches the contents of `part` and returns its lease.
@@ -525,7 +525,7 @@ impl<K, V, T, D> ReadHandle<K, V, T, D>
525525
where
526526
K: Debug + Codec,
527527
V: Debug + Codec,
528-
T: Timestamp + Lattice + Codec64 + Sync,
528+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
529529
D: Semigroup + Codec64 + Send + Sync,
530530
{
531531
pub(crate) async fn new(
@@ -939,7 +939,7 @@ impl<K, V, T, D> ReadHandle<K, V, T, D>
939939
where
940940
K: Debug + Codec + Ord,
941941
V: Debug + Codec + Ord,
942-
T: Timestamp + Lattice + Codec64 + Sync,
942+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
943943
D: Semigroup + Ord + Codec64 + Send + Sync,
944944
{
945945
/// Generates a [Self::snapshot], and fetches all of the batches it
@@ -1135,7 +1135,7 @@ impl<K, V, T, D> ReadHandle<K, V, T, D>
11351135
where
11361136
K: Debug + Codec + Ord,
11371137
V: Debug + Codec + Ord,
1138-
T: Timestamp + Lattice + Codec64 + Sync,
1138+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
11391139
D: Semigroup + Codec64 + Send + Sync,
11401140
{
11411141
/// Generates a [Self::snapshot], and streams out all of the updates
@@ -1188,7 +1188,7 @@ impl<K, V, T, D> ReadHandle<K, V, T, D>
11881188
where
11891189
K: Debug + Codec + Ord,
11901190
V: Debug + Codec + Ord,
1191-
T: Timestamp + Lattice + Codec64 + Ord + Sync,
1191+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
11921192
D: Semigroup + Ord + Codec64 + Send + Sync,
11931193
{
11941194
/// Test helper to generate a [Self::snapshot] call that is expected to

src/persist-client/src/write.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use proptest_derive::Arbitrary;
2929
use semver::Version;
3030
use serde::{Deserialize, Serialize};
3131
use timely::PartialOrder;
32+
use timely::order::TotalOrder;
3233
use timely::progress::{Antichain, Timestamp};
3334
use tokio::runtime::Handle;
3435
use tracing::{Instrument, debug_span, info, warn};
@@ -136,7 +137,7 @@ impl<K, V, T, D> WriteHandle<K, V, T, D>
136137
where
137138
K: Debug + Codec,
138139
V: Debug + Codec,
139-
T: Timestamp + Lattice + Codec64 + Sync,
140+
T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
140141
D: Semigroup + Ord + Codec64 + Send + Sync,
141142
{
142143
pub(crate) fn new(

src/persist-types/src/codec_impls.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use arrow::array::{
1818
StructArray,
1919
};
2020
use bytes::{BufMut, Bytes};
21-
use timely::order::Product;
2221

2322
use crate::arrow::ArrayOrd;
2423
use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema};
@@ -515,24 +514,6 @@ impl Opaque for u64 {
515514
}
516515
}
517516

518-
impl Codec64 for Product<u32, u32> {
519-
fn codec_name() -> String {
520-
"Product<u32, u32>".to_owned()
521-
}
522-
523-
fn encode(&self) -> [u8; 8] {
524-
let o = self.outer.to_le_bytes();
525-
let i = self.inner.to_le_bytes();
526-
[o[0], o[1], o[2], o[3], i[0], i[1], i[2], i[3]]
527-
}
528-
529-
fn decode(buf: [u8; 8]) -> Self {
530-
let outer = [buf[0], buf[1], buf[2], buf[3]];
531-
let inner = [buf[4], buf[5], buf[6], buf[7]];
532-
Product::new(u32::from_le_bytes(outer), u32::from_le_bytes(inner))
533-
}
534-
}
535-
536517
// TODO: Remove this once we wrap coord epochs in an `Epoch` struct and impl
537518
// Opaque on `Epoch` instead.
538519
impl Opaque for i64 {

src/storage-client/src/storage_collections.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3194,7 +3194,7 @@ async fn finalize_shards_task<T>(
31943194
read_only,
31953195
}: FinalizeShardsTaskConfig,
31963196
) where
3197-
T: TimelyTimestamp + Lattice + Codec64 + Sync,
3197+
T: TimelyTimestamp + TotalOrder + Lattice + Codec64 + Sync,
31983198
{
31993199
if read_only {
32003200
info!("disabling shard finalization in read only mode");

src/storage-controller/src/persist_handles.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl<T: Timestamp + Lattice + Codec64> PersistTableWriteCmd<T> {
9595
}
9696
}
9797

98-
async fn append_work<T2: Timestamp + Lattice + Codec64 + Sync>(
98+
async fn append_work<T2: Timestamp + TotalOrder + Lattice + Codec64 + Sync>(
9999
write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), T2, StorageDiff>>,
100100
mut commands: BTreeMap<
101101
GlobalId,

0 commit comments

Comments
 (0)