Skip to content

Commit e289904

Browse files
Add metrics to MutationTracking
1 parent 7846614 commit e289904

File tree

9 files changed

+501
-1
lines changed

9 files changed

+501
-1
lines changed

src/java/org/apache/cassandra/journal/Journal.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,32 @@ private void shutDown() throws InterruptedException
832832
}
833833
}
834834

835+
public long getDiskSpaceUsed()
836+
{
837+
long totalSize = 0;
838+
839+
try (ReferencedSegments<K, V> refs = selectAndReference(s -> true))
840+
{
841+
for (Segment<K, V> segment : refs.all())
842+
{
843+
File dataFile = segment.descriptor.fileFor(Component.DATA);
844+
if (dataFile.exists())
845+
totalSize += dataFile.length();
846+
}
847+
}
848+
849+
// Add current active segment
850+
ActiveSegment<K, V> active = currentSegment;
851+
if (active != null)
852+
{
853+
File activeFile = active.descriptor.fileFor(Component.DATA);
854+
if (activeFile.exists())
855+
totalSize += activeFile.length();
856+
}
857+
858+
return totalSize;
859+
}
860+
835861
private ActiveSegment<K, V> createSegment()
836862
{
837863
Descriptor descriptor = Descriptor.create(directory, nextSegmentId.getAndIncrement(), params.userVersion());

src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
139139
.add(KeyspaceMetrics.TYPE_NAME)
140140
.add(MemtablePool.TYPE_NAME)
141141
.add(MessagingMetrics.TYPE_NAME)
142+
.add(MutationTrackingMetrics.TYPE_NAME)
142143
.add(MutualTlsMetrics.TYPE_NAME)
143144
.add(PaxosMetrics.TYPE_NAME)
144145
.add(ReadRepairMetrics.TYPE_NAME)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.metrics;
20+
21+
import com.codahale.metrics.Counter;
22+
import com.codahale.metrics.Gauge;
23+
import com.codahale.metrics.Histogram;
24+
import org.apache.cassandra.replication.MutationJournal;
25+
import org.apache.cassandra.replication.MutationTrackingService;
26+
27+
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
28+
29+
public class MutationTrackingMetrics
30+
{
31+
public static final String TYPE_NAME = "MutationTracking";
32+
private static final MetricNameFactory factory = new DefaultNameFactory(TYPE_NAME);
33+
34+
public static final MutationTrackingMetrics instance = new MutationTrackingMetrics();
35+
36+
public final Counter broadcastOffsetsDiscovered; // Newly-witnessed offsets discovered via broadcast
37+
public final Counter writeTimeOffsetsDiscovered; // Newly-witnessed offsets discovered at write time
38+
public final Histogram readSummarySize; // Read summary sizes
39+
public final Gauge<Long> unreconciledMutationCount; // Number of unreconciled mutations
40+
public final Gauge<Long> journalDiskSpaceUsed; // Size of MutationJournal on disk
41+
42+
@SuppressWarnings("Convert2MethodRef")
43+
private MutationTrackingMetrics()
44+
{
45+
broadcastOffsetsDiscovered = Metrics.counter(factory.createMetricName("broadcastOffsetsDiscovered"));
46+
writeTimeOffsetsDiscovered = Metrics.counter(factory.createMetricName("WriteTimeOffsetsDiscovered"));
47+
readSummarySize = Metrics.histogram(factory.createMetricName("ReadSummarySize"), false);
48+
unreconciledMutationCount = Metrics.register(
49+
factory.createMetricName("UnreconciledMutationCount"),
50+
() -> MutationTrackingService.instance.getUnreconciledMutationCount()
51+
);
52+
journalDiskSpaceUsed = Metrics.register(
53+
factory.createMetricName("JournalDiskSpaceUsed"),
54+
() -> MutationJournal.instance.getDiskSpaceUsed()
55+
);
56+
}
57+
}

src/java/org/apache/cassandra/replication/CoordinatorLog.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.cassandra.dht.AbstractBounds;
4646
import org.apache.cassandra.dht.Range;
4747
import org.apache.cassandra.dht.Token;
48+
import org.apache.cassandra.metrics.MutationTrackingMetrics;
4849
import org.apache.cassandra.schema.SchemaConstants;
4950
import org.apache.cassandra.schema.TableId;
5051
import org.apache.cassandra.tcm.ClusterMetadata;
@@ -208,8 +209,14 @@ void updateReplicatedOffsets(Offsets offsets, boolean persisted, int onNodeId)
208209

209210
private void updateWitnessedReplicatedOffsets(Offsets offsets, int onNodeId)
210211
{
212+
// Track newly-witnessed offsets from broadcasts (use array for lambda)
213+
int[] newlyWitnessedCount = {0};
214+
211215
witnessedOffsets.get(onNodeId).addAll(offsets, (ignore, start, end) ->
212216
{
217+
// Count the newly-witnessed offsets in this range
218+
newlyWitnessedCount[0] += (end - start + 1);
219+
213220
for (int offset = start; offset <= end; ++offset)
214221
{
215222
// TODO (desired): use the fact that Offsets are ordered to optimise this look up
@@ -220,6 +227,9 @@ private void updateWitnessedReplicatedOffsets(Offsets offsets, int onNodeId)
220227
}
221228
}
222229
});
230+
231+
// Record metric for newly witnessed offsets only
232+
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.inc(newlyWitnessedCount[0]);
223233
}
224234

225235
private void updatePersistedReplicatedOffsets(Offsets offsets, int onNodeId)
@@ -272,6 +282,19 @@ Offsets.Immutable collectReconciledOffsets()
272282
}
273283
}
274284

285+
public long getUnreconciledCount()
286+
{
287+
lock.readLock().lock();
288+
try
289+
{
290+
return unreconciledMutations.size();
291+
}
292+
finally
293+
{
294+
lock.readLock().unlock();
295+
}
296+
}
297+
275298
boolean startWriting(Mutation mutation)
276299
{
277300
lock.writeLock().lock();
@@ -301,6 +324,9 @@ void finishWriting(Mutation mutation)
301324
if (!witnessedOffsets.get(localNodeId).add(offset))
302325
return;
303326

327+
// Track write-time discovery of newly-witnessed offset
328+
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.inc();
329+
304330
unreconciledMutations.finishWriting(mutation);
305331

306332
if (remoteReplicasWitnessed(offset))

src/java/org/apache/cassandra/replication/MutationJournal.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,4 +746,9 @@ public int countStaticSegmentsForTesting()
746746
{
747747
return journal.countStaticSegmentsForTesting();
748748
}
749+
750+
public long getDiskSpaceUsed()
751+
{
752+
return journal.getDiskSpaceUsed();
753+
}
749754
}

src/java/org/apache/cassandra/replication/MutationTrackingService.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.cassandra.exceptions.RequestFailure;
5555
import org.apache.cassandra.gms.FailureDetector;
5656
import org.apache.cassandra.locator.InetAddressAndPort;
57+
import org.apache.cassandra.metrics.MutationTrackingMetrics;
5758
import org.apache.cassandra.net.Message;
5859
import org.apache.cassandra.net.MessagingService;
5960
import org.apache.cassandra.net.Verb;
@@ -330,7 +331,9 @@ public MutationSummary createSummaryForKey(DecoratedKey key, TableId tableId, bo
330331
shardLock.readLock().lock();
331332
try
332333
{
333-
return getOrCreateShards(tableId).createSummaryForKey(key, tableId, includePending);
334+
MutationSummary summary = getOrCreateShards(tableId).createSummaryForKey(key, tableId, includePending);
335+
MutationTrackingMetrics.instance.readSummarySize.update(summary.size());
336+
return summary;
334337
}
335338
finally
336339
{
@@ -370,6 +373,20 @@ void forEachKeyspace(Consumer<KeyspaceShards> consumer)
370373
}
371374
}
372375

376+
public long getUnreconciledMutationCount()
377+
{
378+
if (!isStarted())
379+
return 0L;
380+
381+
final long[] count = {0L};
382+
forEachKeyspace(ks -> {
383+
ks.forEachShard(shard -> {
384+
count[0] += shard.getUnreconciledCount();
385+
});
386+
});
387+
return count[0];
388+
}
389+
373390
public void collectLocallyMissingMutations(MutationSummary remoteSummary, Log2OffsetsMap.Mutable into)
374391
{
375392
shardLock.readLock().lock();

src/java/org/apache/cassandra/replication/Shard.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,16 @@ private CoordinatorLog getOrCreate(CoordinatorLogId logId)
297297
return getOrCreate(logId.asLong());
298298
}
299299

300+
public long getUnreconciledCount()
301+
{
302+
long count = 0;
303+
for (CoordinatorLog log : logs.values())
304+
{
305+
count += log.getUnreconciledCount();
306+
}
307+
return count;
308+
}
309+
300310
@Nonnull
301311
private CoordinatorLog get(CoordinatorLogId logId)
302312
{

src/java/org/apache/cassandra/replication/UnreconciledMutations.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,11 @@ boolean isEmpty()
228228
return statesMap.isEmpty();
229229
}
230230

231+
public int size()
232+
{
233+
return statesMap.size();
234+
}
235+
231236
static UnreconciledMutations loadFromJournal(Node2OffsetsMap witnessedOffsets, int localNodeId)
232237
{
233238
UnreconciledMutations result = new UnreconciledMutations();

0 commit comments

Comments
 (0)