Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ build:optimized --config=debuginfo-none
build:optimized --copt=-fno-lto
build:optimized --linkopt=-fno-lto
build:optimized --@rules_rust//rust/settings:lto=off
build:optimized --cxxopt=-isystem/opt/homebrew/opt/llvm/include

# Build with the Rust Nightly Toolchain
build:rust-nightly --@rules_rust//rust/toolchain/channel=nightly
Expand Down
4 changes: 2 additions & 2 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ steps:
agents:
queue: builder-linux-x86_64
env:
CI_SANITIZER: address
CI_SANITIZER: none
CI_BAZEL_BUILD: 1

sanitizer: skip
Expand All @@ -70,7 +70,7 @@ steps:
agents:
queue: builder-linux-aarch64-mem
env:
CI_SANITIZER: address
CI_SANITIZER: none
CI_BAZEL_BUILD: 1
sanitizer: skip
branches: "main"
Expand Down
10 changes: 9 additions & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,15 @@ def get_variable_system_parameters(
"83886080",
["67108864", "134217728", "536870912", "1073741824"],
),
VariableSystemParameter(
"persist_enable_incremental_compaction",
("true" if version > MzVersion.parse_mz("v0.154.0-dev") else "false"),
(
["true", "false"]
if version > MzVersion.parse_mz("v0.154.0-dev")
else ["false"]
),
),
VariableSystemParameter(
"persist_use_critical_since_catalog", "true", ["true", "false"]
),
Expand Down Expand Up @@ -566,7 +575,6 @@ def get_default_system_parameters(
"compute_peek_response_stash_read_memory_budget_bytes",
"compute_peek_stash_num_batches",
"compute_peek_stash_batch_size",
"persist_enable_incremental_compaction",
"storage_statistics_retention_duration",
"enable_ctp_cluster_protocols",
"enable_paused_cluster_readhold_downgrade",
Expand Down
18 changes: 9 additions & 9 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,15 +1328,15 @@ impl UnopenedPersistCatalogState {
let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
// We're going to gradually turn this on via dyncfgs. Run it in a task so that it
// doesn't block startup.
let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
let () =
mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
&write_handle,
|| fuel.get(),
|| wait.get(),
)
.await;
});
// let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
// let () =
mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
&write_handle,
|| fuel.get(),
|| wait.get(),
)
.await;
// });
}

Ok((Box::new(catalog), audit_log_handle))
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/cli/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
&machine,
req,
write.write_schemas.clone(),
true,
)
.await;
let apply_maintenance = match res {
Expand Down
47 changes: 39 additions & 8 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ where
compact_span.follows_from(&Span::current());
let gc = gc.clone();
mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
let res = Self::compact_and_apply(&machine, req, write_schemas)
let res = Self::compact_and_apply(&machine, req, write_schemas, false)
.instrument(compact_span)
.await;
if let Ok(maintenance) = res {
Expand Down Expand Up @@ -338,6 +338,7 @@ where
machine: &Machine<K, V, T, D>,
req: CompactReq<T>,
write_schemas: Schemas<K, V>,
disable_incremental: bool,
) -> Result<RoutineMaintenance, anyhow::Error> {
let metrics = Arc::clone(&machine.applier.metrics);
metrics.compaction.started.inc();
Expand Down Expand Up @@ -422,8 +423,8 @@ where
.iter()
.all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));

let incremental_enabled = ENABLE_INCREMENTAL_COMPACTION
.get(&machine_clone.applier.cfg)
let incremental_enabled = !disable_incremental
&& ENABLE_INCREMENTAL_COMPACTION.get(&machine_clone.applier.cfg)
&& all_runs_have_uuids
&& all_runs_have_len;
let stream = Self::compact_stream(
Expand Down Expand Up @@ -695,12 +696,42 @@ where
.collect::<BTreeSet<_>>();
match batch_ids.iter().exactly_one().ok() {
Some(batch_id) => {
CompactionInput::PartialBatch(
*batch_id,
run_ids
)
// We are compacting runs from exactly one batch. Decide whether this
// chunk represents the entire batch or only a subset of its runs.

let full_run_count = req.inputs
.iter()
.find(|x| x.id == *batch_id)
.map(|b| b.batch.run_meta.len())
.expect("trying to replace with an ID not present in input!");

let covers_whole_batch = run_ids.len() == full_run_count;
if !covers_whole_batch {
tracing::info!(
"incremental compaction of batch {:?} with {} runs, \
compacting {} out of {} runs",
batch_id,
full_run_count,
run_ids.len(),
full_run_count
);
CompactionInput::PartialBatch(*batch_id, run_ids)
} else {
tracing::info!(
"incremental compaction of batch {:?} with {} runs, \
compacting all runs",
batch_id,
full_run_count,
);
input_id_range(batch_ids)
}
}
None => input_id_range(batch_ids),
None => {
tracing::info!(
"incremental compaction of multiple batches: {:?}",
batch_ids,
);
input_id_range(batch_ids)},
}
} else {
input_id_range(batch_ids)
Expand Down
13 changes: 9 additions & 4 deletions src/persist-client/src/internal/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use serde::{Serialize, Serializer};
use timely::PartialOrder;
use timely::progress::frontier::AntichainRef;
use timely::progress::{Antichain, Timestamp};
use tracing::error;
use tracing::{error, info};

use crate::internal::paths::WriterKey;
use crate::internal::state::{HollowBatch, RunId};
Expand Down Expand Up @@ -1685,6 +1685,7 @@ impl<T: Timestamp + Lattice> Spine<T> {
// If any merges exist, we can directly call `apply_fuel`.
self.apply_fuel(&fuel, log);
} else {
info!("no merges in progress, introducing empty batch");
// Otherwise, we'll need to introduce fake updates to move merges
// along.

Expand Down Expand Up @@ -1754,15 +1755,19 @@ impl<T: Timestamp + Lattice> Spine<T> {
/// words, there are either zero runs (fully empty) or exactly one logical
/// run of data remaining.
fn reduced(&self) -> bool {
self.spine_batches()
let total_runs = self
.spine_batches()
.map(|b| {
b.parts
.iter()
.map(|p| p.batch.run_meta.len())
.sum::<usize>()
})
.sum::<usize>()
< 2
.sum::<usize>();
let total_spine_batches = self.spine_batches().count();
info!(?total_runs, ?total_spine_batches, "checking if reduced");
// total_spine_batches < 2
total_runs < 2
}

/// Describes the merge progress of layers in the trace.
Expand Down
56 changes: 30 additions & 26 deletions test/limits/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def body(cls) -> None:
# three extra connections for mz_system, default connection, and one
# since sqlparse 0.4.4. 3 reserved superuser connections since materialize#25666
# try bumping limit a bit further since this is sometimes flaky
print(f"ALTER SYSTEM SET max_connections = {Connections.COUNT+10};")
print(f"ALTER SYSTEM SET max_connections = {Connections.COUNT + 10};")

for i in cls.all():
print(
Expand Down Expand Up @@ -308,6 +308,7 @@ class KafkaSourcesSameTopic(Generator):

@classmethod
def body(cls) -> None:
print("$ set-sql-timeout duration=300s")
print("$ postgres-execute connection=mz_system")
print(f"ALTER SYSTEM SET max_sources = {cls.COUNT * 10};")
print("$ postgres-execute connection=mz_system")
Expand Down Expand Up @@ -367,7 +368,7 @@ def body(cls) -> None:
'$ set value-schema={"type": "record", "name": "r", "fields": [{"name": "f1", "type": "string"}]}'
)
print(
f"$ kafka-create-topic topic=kafka-partitions partitions={round(cls.COUNT/2)}"
f"$ kafka-create-topic topic=kafka-partitions partitions={round(cls.COUNT / 2)}"
)
print(
"$ kafka-ingest format=avro topic=kafka-partitions key-format=avro key-schema=${key-schema} schema=${value-schema} partition=-1"
Expand Down Expand Up @@ -707,7 +708,7 @@ def body(cls) -> None:
print("> CREATE TABLE t1 (f1 INTEGER);")
print("> INSERT INTO t1 VALUES (1);")
table_list = ", ".join(f"t1 as a{i}" for i in cls.all())
condition_list = " AND ".join(f"a{i}.f1 = a{i+1}.f1" for i in cls.no_last())
condition_list = " AND ".join(f"a{i}.f1 = a{i + 1}.f1" for i in cls.no_last())
cls.store_explain_and_run(f"SELECT * FROM {table_list} WHERE {condition_list};")
print(" ".join("1" for i in cls.all()))

Expand Down Expand Up @@ -747,7 +748,7 @@ def body(cls) -> None:
print("> CREATE TABLE t1 (f1 INTEGER);")
print("> INSERT INTO t1 VALUES (1);")
table_list = " LEFT JOIN ".join(
f"t1 as a{i} ON (a{i-1}.f1 = a{i}.f1)" for i in cls.no_first()
f"t1 as a{i} ON (a{i - 1}.f1 = a{i}.f1)" for i in cls.no_first()
)
cls.store_explain_and_run(f"SELECT * FROM t1 AS a1 LEFT JOIN {table_list};")
print(" ".join("1" for i in cls.all()))
Expand Down Expand Up @@ -884,7 +885,7 @@ def body(cls) -> None:
print("> CREATE VIEW v0 (f1) AS SELECT f1 FROM t;")

for i in cls.all():
print(f"> CREATE VIEW v{i} AS SELECT f1 + 1 AS f1 FROM v{i-1};")
print(f"> CREATE VIEW v{i} AS SELECT f1 + 1 AS f1 FROM v{i - 1};")

cls.store_explain_and_run(f"SELECT * FROM v{cls.COUNT};")
print(f"{cls.COUNT}")
Expand All @@ -910,7 +911,7 @@ def body(cls) -> None:

for i in cls.all():
print(
f"> CREATE MATERIALIZED VIEW v{i} AS SELECT f1 + 1 AS f1 FROM v{i-1};"
f"> CREATE MATERIALIZED VIEW v{i} AS SELECT f1 + 1 AS f1 FROM v{i - 1};"
)

cls.store_explain_and_run(f"SELECT * FROM v{cls.COUNT};")
Expand Down Expand Up @@ -946,7 +947,7 @@ def body(cls) -> None:
print("> CREATE TABLE t1 (f1 INTEGER);")
print("> INSERT INTO t1 VALUES " + ", ".join(f"({i})" for i in cls.all()))
cte_list = ", ".join(
f"a{i} AS (SELECT f1 + 1 AS f1 FROM a{i-1} WHERE f1 <= {i})"
f"a{i} AS (SELECT f1 + 1 AS f1 FROM a{i - 1} WHERE f1 <= {i})"
for i in cls.no_first()
)
table_list = ", ".join(f"a{i}" for i in cls.all())
Expand All @@ -966,7 +967,7 @@ def body(cls) -> None:
print("> CREATE TABLE t1 (f1 INTEGER);")
print("> INSERT INTO t1 VALUES (1)")
cte_list = ", ".join(
f"a{i} AS (SELECT a{i-1}.f1 + 0 AS f1 FROM a{i-1}, t1 WHERE a{i-1}.f1 = t1.f1)"
f"a{i} AS (SELECT a{i - 1}.f1 + 0 AS f1 FROM a{i - 1}, t1 WHERE a{i - 1}.f1 = t1.f1)"
for i in cls.no_first()
)
cls.store_explain_and_run(
Expand All @@ -991,7 +992,7 @@ def body(cls) -> None:
for i in cls.all()
)
cls.store_explain_and_run(f"SELECT * FROM {table_list};")
print(" ".join(f"{i+1}" for i in cls.all()))
print(" ".join(f"{i + 1}" for i in cls.all()))


class Lateral(Generator):
Expand All @@ -1006,7 +1007,7 @@ def body(cls) -> None:
print("> CREATE TABLE t1 (f1 INTEGER);")
print("> INSERT INTO t1 VALUES (1)")
table_list = ", LATERAL ".join(
f"(SELECT t1.f1 + {i-1} AS f1 FROM t1 WHERE f1 <= a{i-1}.f1) AS a{i}"
f"(SELECT t1.f1 + {i - 1} AS f1 FROM t1 WHERE f1 <= a{i - 1}.f1) AS a{i}"
for i in cls.no_first()
)
cls.store_explain_and_run(f"SELECT * FROM t1 AS a1 , LATERAL {table_list};")
Expand Down Expand Up @@ -1314,7 +1315,7 @@ def body(cls) -> None:
)
print("> CREATE DEFAULT INDEX ON v")
cls.store_explain_and_run("SELECT LENGTH(c) FROM v")
print(f"{cls.COUNT*1024}")
print(f"{cls.COUNT * 1024}")


class ArrayAgg(Generator):
Expand All @@ -1325,25 +1326,27 @@ def body(cls) -> None:
print("> SET statement_timeout='300s'")
print(
f"""> CREATE TABLE t ({
", ".join(
", ".join([
f"a{i} STRING",
f"b{i} STRING",
f"c{i} STRING",
f"d{i} STRING[]",
])
for i in cls.all()
)
});"""
", ".join(
", ".join(
[
f"a{i} STRING",
f"b{i} STRING",
f"c{i} STRING",
f"d{i} STRING[]",
]
)
for i in cls.all()
)
});"""
)
print("> INSERT INTO t DEFAULT VALUES;")
print(
f"""> CREATE MATERIALIZED VIEW v2 AS SELECT {
", ".join(
f"ARRAY_AGG(a{i} ORDER BY b1) FILTER (WHERE 's{i}' = ANY(d{i})) AS r{i}"
for i in cls.all()
)
} FROM t GROUP BY a1;"""
", ".join(
f"ARRAY_AGG(a{i} ORDER BY b1) FILTER (WHERE 's{i}' = ANY(d{i})) AS r{i}"
for i in cls.all()
)
} FROM t GROUP BY a1;"""
)
print("> CREATE DEFAULT INDEX ON v2;")

Expand Down Expand Up @@ -1910,6 +1913,7 @@ def app_password(email: str) -> str:
metadata_store="cockroach",
listeners_config_path=f"{MZ_ROOT}/src/materialized/ci/listener_configs/no_auth_https.json",
support_external_clusterd=True,
additional_system_parameter_defaults={"log_filter": "debug"},
),
Mz(app_password=""),
]
Expand Down