diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index 136d70a9dc6f..c404c325be4b 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -832,6 +832,22 @@ private void shutDown() throws InterruptedException } } + public long getDiskSpaceUsed() + { + long totalSize = 0; + + try (ReferencedSegments refs = selectAndReference(s -> true)) + { + for (Segment segment : refs.all()) + { + File dataFile = segment.descriptor.fileFor(Component.DATA); + if (dataFile.exists()) + totalSize += dataFile.length(); + } + } + return totalSize; + } + private ActiveSegment createSegment() { Descriptor descriptor = Descriptor.create(directory, nextSegmentId.getAndIncrement(), params.userVersion()); diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java index 44bcf2d6cf87..133acedcbf96 100644 --- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java +++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java @@ -139,6 +139,7 @@ public class CassandraMetricsRegistry extends MetricRegistry .add(KeyspaceMetrics.TYPE_NAME) .add(MemtablePool.TYPE_NAME) .add(MessagingMetrics.TYPE_NAME) + .add(MutationTrackingMetrics.TYPE_NAME) .add(MutualTlsMetrics.TYPE_NAME) .add(PaxosMetrics.TYPE_NAME) .add(ReadRepairMetrics.TYPE_NAME) diff --git a/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java new file mode 100644 index 000000000000..822d7c5ac12d --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import org.apache.cassandra.replication.MutationJournal; +import org.apache.cassandra.replication.MutationTrackingService; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +public class MutationTrackingMetrics +{ + public static final String TYPE_NAME = "MutationTracking"; + private static final MetricNameFactory factory = new DefaultNameFactory(TYPE_NAME); + + public static final MutationTrackingMetrics instance = new MutationTrackingMetrics(); + + public final Counter broadcastOffsetsDiscovered; // Newly-witnessed offsets discovered via broadcast + public final Counter writeTimeOffsetsDiscovered; // Newly-witnessed offsets discovered at write time + public final Histogram readSummarySize; // Read summary sizes + public final Gauge unreconciledMutationCount; // Number of unreconciled mutations + public final Gauge journalDiskSpaceUsed; // Size of MutationJournal on disk + + @SuppressWarnings("Convert2MethodRef") + private MutationTrackingMetrics() + { + broadcastOffsetsDiscovered = Metrics.counter(factory.createMetricName("BroadcastOffsetsDiscovered")); + writeTimeOffsetsDiscovered = Metrics.counter(factory.createMetricName("WriteTimeOffsetsDiscovered")); + readSummarySize = Metrics.histogram(factory.createMetricName("ReadSummarySize"), true); + unreconciledMutationCount = Metrics.register( + factory.createMetricName("UnreconciledMutationCount"), + () -> MutationTrackingService.instance.getUnreconciledMutationCount() + ); + journalDiskSpaceUsed = Metrics.register( + factory.createMetricName("JournalDiskSpaceUsed"), + () -> MutationJournal.instance.getDiskSpaceUsed() + ); + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 6f4a4aed4cd9..1b1273e009c3 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -45,6 +45,7 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.metrics.MutationTrackingMetrics; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.tcm.ClusterMetadata; @@ -208,8 +209,14 @@ void updateReplicatedOffsets(Offsets offsets, boolean persisted, int onNodeId) private void updateWitnessedReplicatedOffsets(Offsets offsets, int onNodeId) { + // Track newly-witnessed offsets from broadcasts (use array for lambda) + int[] newlyWitnessedCount = {0}; + witnessedOffsets.get(onNodeId).addAll(offsets, (ignore, start, end) -> { + // Count the newly-witnessed offsets in this range + newlyWitnessedCount[0] += (end - start + 1); + for (int offset = start; offset <= end; ++offset) { // TODO (desired): use the fact that Offsets are ordered to optimise this look up @@ -220,6 +227,9 @@ private void updateWitnessedReplicatedOffsets(Offsets offsets, int onNodeId) } } }); + + // Record metric for newly witnessed offsets only + MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.inc(newlyWitnessedCount[0]); } private void updatePersistedReplicatedOffsets(Offsets offsets, int onNodeId) @@ -272,6 +282,19 @@ Offsets.Immutable collectReconciledOffsets() } } + public long getUnreconciledCount() + { + lock.readLock().lock(); + try + { + return unreconciledMutations.size(); + } + finally + { + lock.readLock().unlock(); + } + } + boolean startWriting(Mutation mutation) { lock.writeLock().lock(); @@ -301,6 +324,9 @@ void finishWriting(Mutation mutation) if (!witnessedOffsets.get(localNodeId).add(offset)) return; + // Track write-time discovery of newly-witnessed offset + MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.inc(); + unreconciledMutations.finishWriting(mutation); if (remoteReplicasWitnessed(offset)) diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java b/src/java/org/apache/cassandra/replication/MutationJournal.java index e2a2a82e642e..21ff7d49c669 100644 --- a/src/java/org/apache/cassandra/replication/MutationJournal.java +++ b/src/java/org/apache/cassandra/replication/MutationJournal.java @@ -746,4 +746,9 @@ public int countStaticSegmentsForTesting() { return journal.countStaticSegmentsForTesting(); } + + public long getDiskSpaceUsed() + { + return journal.getDiskSpaceUsed(); + } } diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 4009dbd39a88..bfbced4ae414 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -54,6 +54,7 @@ import org.apache.cassandra.exceptions.RequestFailure; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.MutationTrackingMetrics; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; @@ -330,7 +331,9 @@ public MutationSummary createSummaryForKey(DecoratedKey key, TableId tableId, bo shardLock.readLock().lock(); try { - return getOrCreateShards(tableId).createSummaryForKey(key, tableId, includePending); + MutationSummary summary = getOrCreateShards(tableId).createSummaryForKey(key, tableId, includePending); + MutationTrackingMetrics.instance.readSummarySize.update(summary.size()); + return summary; } finally { @@ -343,7 +346,9 @@ public MutationSummary createSummaryForRange(AbstractBounds r shardLock.readLock().lock(); try { - return getOrCreateShards(tableId).createSummaryForRange(range, tableId, includePending); + MutationSummary summary = getOrCreateShards(tableId).createSummaryForRange(range, tableId, includePending); + MutationTrackingMetrics.instance.readSummarySize.update(summary.size()); + return summary; } finally { @@ -370,6 +375,20 @@ void forEachKeyspace(Consumer consumer) } } + public long getUnreconciledMutationCount() + { + if (!isStarted()) + return 0L; + + final long[] count = {0L}; + forEachKeyspace(ks -> { + ks.forEachShard(shard -> { + count[0] += shard.getUnreconciledCount(); + }); + }); + return count[0]; + } + public void collectLocallyMissingMutations(MutationSummary remoteSummary, Log2OffsetsMap.Mutable into) { shardLock.readLock().lock(); diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index 7274127fcb74..52525e247b0f 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -297,6 +297,16 @@ private CoordinatorLog getOrCreate(CoordinatorLogId logId) return getOrCreate(logId.asLong()); } + public long getUnreconciledCount() + { + long count = 0; + for (CoordinatorLog log : logs.values()) + { + count += log.getUnreconciledCount(); + } + return count; + } + @Nonnull private CoordinatorLog get(CoordinatorLogId logId) { diff --git a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java index 036f624b4295..8cdeaa302fb6 100644 --- a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java +++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java @@ -228,6 +228,11 @@ boolean isEmpty() return statesMap.isEmpty(); } + public int size() + { + return statesMap.size(); + } + static UnreconciledMutations loadFromJournal(Node2OffsetsMap witnessedOffsets, int localNodeId) { UnreconciledMutations result = new UnreconciledMutations(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java new file mode 100644 index 000000000000..54d3c6b97348 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tracking; + +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.metrics.MutationTrackingMetrics; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.replication.MutationJournal; +import org.apache.cassandra.replication.MutationTrackingService; + +import org.awaitility.Awaitility; +import org.junit.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +public class MutationTrackingMetricsTest extends TestBaseImpl +{ + private static final String CREATE_KEYSPACE = + "CREATE KEYSPACE %s WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type = 'tracked'"; + + private static final String CREATE_TABLE = + "CREATE TABLE %s.tbl (pk int PRIMARY KEY, val text)"; + + @Test(timeout = 60000) + @SuppressWarnings("Convert2MethodRef") + public void testWriteTimeOffsetsDiscoveredMetric() throws Throwable + { + try (Cluster cluster = Cluster.build(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK) + .with(Feature.GOSSIP)) + .start()) + { + cluster.schemaChange(withKeyspace(CREATE_KEYSPACE)); + cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE)); + + // Get initial write-time discovery counts on all nodes + long initialNode1Count = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()); + long initialNode2Count = cluster.get(2).callOnInstance(() -> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()); + long initialNode3Count = cluster.get(3).callOnInstance(() -> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()); + + // Perform writes with QUORUM - each write goes to at least 2 replicas + int numWrites = 10; + for (int i = 0; i < numWrites; i++) + { + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, val) VALUES (?, ?)"), + ConsistencyLevel.QUORUM, i, "test" + i); + } + + // Wait for all nodes to discover offsets at write time + // With RF=3, each node should discover offsets and total should be at least numWrites * 3 + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(100)) + .until(() -> { + long node1Delta = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()) - initialNode1Count; + long node2Delta = cluster.get(2).callOnInstance(() -> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()) - initialNode2Count; + long node3Delta = cluster.get(3).callOnInstance(() -> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()) - initialNode3Count; + long totalDiscovered = node1Delta + node2Delta + node3Delta; + + return node1Delta > 0 && node2Delta > 0 && node3Delta > 0 && totalDiscovered >= (long) numWrites * 3; + }); + + // Verify final counts + long afterNode1Count = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()); + long afterNode2Count = cluster.get(2).callOnInstance(() -> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()); + long afterNode3Count = cluster.get(3).callOnInstance(() -> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()); + + long node1Delta = afterNode1Count - initialNode1Count; + long node2Delta = afterNode2Count - initialNode2Count; + long node3Delta = afterNode3Count - initialNode3Count; + + assertThat(node1Delta) + .as("Node 1 should have discovered offsets at write time") + .isGreaterThan(0L); + + assertThat(node2Delta) + .as("Node 2 should have discovered offsets at write time") + .isGreaterThan(0L); + + assertThat(node3Delta) + .as("Node 3 should have discovered offsets at write time") + .isGreaterThan(0L); + + long totalDiscovered = node1Delta + node2Delta + node3Delta; + assertThat(totalDiscovered) + .as("Total write-time discoveries across all nodes should be at least %d (RF=3)", numWrites * 3) + .isGreaterThanOrEqualTo((long) numWrites * 3); + } + } + + @Test(timeout = 60000) + @SuppressWarnings("Convert2MethodRef") + public void testBroadcastOffsetsDiscoveredMetric() throws Throwable + { + try (Cluster cluster = Cluster.build(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK) + .with(Feature.GOSSIP)) + .start()) + { + cluster.schemaChange(withKeyspace(CREATE_KEYSPACE)); + cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE)); + + // Record initial broadcast metrics on receiving node 3 since we are next going to block this node to from receiving mutations + long initialNode3Count = cluster.get(3).callOnInstance(() -> MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount()); + + // Block node 3 from receiving mutation writes (but allow broadcast messages) + cluster.filters().verbs(Verb.MUTATION_REQ.id).to(3).drop(); + + // Write data - nodes 1 and 2 will get it, node 3 won't + int numWrites = 5; + for (int i = 0; i < numWrites; i++) + { + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, val) VALUES (?, ?)"), + ConsistencyLevel.QUORUM, i, "test" + i); + } + + // Verify node 3 missed the writes + Object[][] node3Before = cluster.coordinator(3).execute( + withKeyspace("SELECT * FROM %s.tbl"), ConsistencyLevel.ONE); + assertThat(node3Before.length) + .as("Node 3 should have no data (was blocked)") + .isEqualTo(0); + + // Broadcast offsets from node 1 to other nodes + // This tells node 3 about mutations it's missing + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + + // Wait for broadcasts to propagate to node 3 + long[] previousCount = {0}; + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(100)) + .until(() -> { + long currentCount = cluster.get(3).callOnInstance(() -> MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount()); + boolean hasDiscoveredOffsets = currentCount > initialNode3Count; + boolean isStable = hasDiscoveredOffsets && currentCount == previousCount[0]; + previousCount[0] = currentCount; + return isStable; + }); + + // Get the count after first broadcast + long afterFirstBroadcast = cluster.get(3).callOnInstance(() -> MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount()); + + // Broadcast the same offsets again (duplicate) - should NOT increment metric + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + + // Wait for duplicate broadcast to propagate, then verify metric stayed the same + // We poll to ensure the broadcast had time to arrive, then check it didn't increment + Awaitility.await() + .pollDelay(Duration.ofMillis(200)) + .atMost(Duration.ofSeconds(2)) + .pollInterval(Duration.ofMillis(100)) + .until(() -> { + long count = cluster.get(3).callOnInstance(() -> MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount()); + return count == afterFirstBroadcast; // Should remain at the same value (duplicate doesn't increment) + }); + + // Clear filter to allow reconciliation + cluster.filters().reset(); + + // Read from node 3 to trigger reconciliation using broadcast offsets + // Node 3 knows it's missing data (from broadcast offsets) and will request it + // Poll for reconciliation to complete + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofMillis(200)) + .until(() -> { + Object[][] result = cluster.coordinator(3).execute( + withKeyspace("SELECT * FROM %s.tbl"), + ConsistencyLevel.QUORUM); + return result.length == numWrites; + }); + + // Verify all rows data is present after reconciliation + Object[][] result = cluster.coordinator(3).execute( + withKeyspace("SELECT * FROM %s.tbl"), + ConsistencyLevel.QUORUM); + assertThat(result.length) + .as("Should return all rows after reconciliation") + .isEqualTo(numWrites); + + // Check metrics after reconciliation - if reconciliation worked, broadcasts happened + long afterNode3Count = cluster.get(3).callOnInstance(() -> MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount()); + long node3Delta = afterNode3Count - initialNode3Count; + + // Node 3 was blocked before and now must have applied broadcast offsets + assertThat(node3Delta) + .as("Node 3 should have applied broadcast offsets") + .isGreaterThan(0L); + } + } + + @Test(timeout = 60000) + @SuppressWarnings("Convert2MethodRef") + public void testReadSummarySizeMetric() throws Throwable + { + try (Cluster cluster = Cluster.build(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK) + .with(Feature.GOSSIP)) + .start()) + { + cluster.schemaChange(withKeyspace(CREATE_KEYSPACE)); + cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE)); + + // Get initial metric value from coordinator node + long initialSize = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.readSummarySize.getCount()); + + // Insert test data + int numWrites = 10; + for (int i = 0; i < numWrites; i++) + { + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, val) VALUES (?, ?)"), + ConsistencyLevel.QUORUM, i, "test" + i); + } + + // Execute read operations (metric should increment once per read request) + int numReads = 10; + for (int i = 0; i < numReads; i++) + { + cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = ?"), + ConsistencyLevel.QUORUM, i); + } + + // Verify metric incremented by at least twice the number of reads as + // each read creates TWO summaries: initial (before read) + secondary (after read) + // This is to detect concurrent writes during read execution for proper reconciliation + long afterSize = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.readSummarySize.getCount()); + + long delta = afterSize - initialSize; + assertThat(delta) + .as("Should have at least twice of %d summaries", numReads) + .isGreaterThanOrEqualTo(2L * numReads); + } + } + + @Test(timeout = 60000) + @SuppressWarnings("Convert2MethodRef") + public void testUnreconciledMutationCountMetric() throws Throwable + { + try (Cluster cluster = Cluster.build(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK) + .with(Feature.GOSSIP)) + .start()) + { + cluster.schemaChange(withKeyspace(CREATE_KEYSPACE)); + cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE)); + + // Get initial unreconciled count (should be 0) + long initialCount = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.unreconciledMutationCount.getValue()); + assertThat(initialCount) + .as("Initial unreconciled count should be 0") + .isEqualTo(0L); + + // Block node 3 from receiving messages from node 1 + cluster.filters().verbs(Verb.MUTATION_REQ.id).from(1).to(3).drop(); + + // Write with QUORUM (only nodes 1 and 2 will receive writes, node 3 won't) + int numWrites = 10; + for (int i = 0; i < numWrites; i++) + { + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, val) VALUES (?, ?)"), + ConsistencyLevel.QUORUM, i, "test" + i); + } + + // Node 1 should now have unreconciled mutations (since node 3 didn't get them) + long afterWrites = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.unreconciledMutationCount.getValue()); + assertThat(afterWrites) + .as("Expected %d unreconciled mutations (node 3 blocked)", numWrites) + .isEqualTo((long) numWrites); + + // Clear filters to allow reconciliation + cluster.filters().reset(); + + // Perform reads to trigger reconciliation + for (int i = 0; i < numWrites; i++) + { + cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = ?"), + ConsistencyLevel.QUORUM, i); + } + + // Wait for reconciliation to complete + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(100)) + .until(() -> { + long count = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.unreconciledMutationCount.getValue()); + return count == 0; + }); + + // Verify reconciliation actually happened + long afterReconcile = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.unreconciledMutationCount.getValue()); + assertThat(afterReconcile) + .as("Unreconciled count should be 0 after reconciliation") + .isEqualTo(0L); + } + } + + @Test(timeout = 60000) + @SuppressWarnings("Convert2MethodRef") + public void testJournalDiskSpaceUsedMetric() throws Throwable + { + try (Cluster cluster = Cluster.build(1) + .withConfig(cfg -> cfg.with(Feature.NETWORK) + .set("commitlog_segment_size", "1MiB")) // Create a smaller size segment + .start()) + { + cluster.schemaChange(withKeyspace(CREATE_KEYSPACE)); + cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE)); + + // Get initial disk space - would be 2 * 1024 * 1024 as 2 segements are allocated by default + long initialSpace = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.journalDiskSpaceUsed.getValue()); + + // Write enough data to fill 1MiB segment and force new segment creation + int numWrites = 200; + for (int i = 0; i < numWrites; i++) + { + cluster.coordinator(1).execute( + withKeyspace("INSERT INTO %s.tbl (pk, val) VALUES (?, ?)"), + ConsistencyLevel.ONE, i, "test-" + i); + + // Close segment every 20 writes to create multiple segments + if (i % 20 == 0 && i > 0) + cluster.get(1).runOnInstance(() -> MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty()); + } + + // Verify disk space increased + long afterWrites = cluster.get(1).callOnInstance(() -> MutationTrackingMetrics.instance.journalDiskSpaceUsed.getValue()); + + assertThat(afterWrites) + .as("Disk space should increase after writes: before=%d", initialSpace) + .isGreaterThan(initialSpace); + } + } +}