Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.1
* Do not wait for repair interval to pass before cleaning up orphaned auto-repair data (CASSANDRA-20995)
* Add cqlsh autocompletion for the identity mapping feature (CASSANDRA-20021)
* Add DDL Guardrail enabling administrators to disallow creation/modification of keyspaces with durable_writes = false (CASSANDRA-20913)
* Optimize Counter, Meter and Histogram metrics using thread local counters (CASSANDRA-20250)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,15 @@ public void repair(AutoRepairConfig.RepairType repairType)
//consistency level to use for local query
UUID myId = StorageService.instance.getHostIdForEndpoint(FBUtilities.getBroadcastAddressAndPort());

// If it's too soon to run repair, don't bother checking if it's our turn.
// Calculate repair turn first - this also cleans up orphan nodes from auto-repair system tables
RepairTurn turn = AutoRepairUtils.myTurnToRunRepair(repairType, myId);

// Check if it's too soon to run repair after calculating turn to ensure cleanup happens
if (tooSoonToRunRepair(repairType, repairState, config, myId))
{
return;
}

RepairTurn turn = AutoRepairUtils.myTurnToRunRepair(repairType, myId);
if (turn == MY_TURN || turn == MY_TURN_DUE_TO_PRIORITY || turn == MY_TURN_FORCE_REPAIR)
{
repairState.recordTurn(turn);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test.repair;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableMap;

import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;

import org.apache.cassandra.Util;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.repair.autorepair.AutoRepair;
import org.apache.cassandra.repair.autorepair.AutoRepairConfig;
import org.apache.cassandra.service.AutoRepairService;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.apache.cassandra.schema.SchemaConstants.DISTRIBUTED_KEYSPACE_NAME;
import static org.apache.cassandra.schema.SystemDistributedKeyspace.AUTO_REPAIR_HISTORY;
import static org.junit.Assert.assertEquals;

/**
* Test that verifies orphan nodes are cleaned up from auto_repair_history even when repairs
* are skipped due to min_repair_interval constraints.
*/
public class AutoRepairOrphanCleanupTest extends TestBaseImpl
{
private static Cluster cluster;

@BeforeClass
public static void init() throws IOException
{
// Configure a 3-node cluster with auto_repair enabled but with a very high min_repair_interval
// This ensures that when we test, repairs will be skipped due to "too soon to repair"
cluster = Cluster.build(3)
.withTokenCount(4)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3, 4))
.withConfig(config -> config
.set("num_tokens", 4)
.set("auto_repair",
ImmutableMap.of(
"repair_check_interval", "1s",
"repair_type_overrides",
ImmutableMap.of(AutoRepairConfig.RepairType.FULL.getConfigName(),
ImmutableMap.builder()
.put("initial_scheduler_delay", "0s")
.put("enabled", "true")
// Set very high min_repair_interval
// to ensure repairs are skipped
.put("min_repair_interval", "24h")
.put("allow_parallel_replica_repair", "true")
.put("repair_by_keyspace", "true")
.build())))
.set("auto_repair.enabled", "true"))
.start();
}

@AfterClass
public static void tearDown()
{
cluster.close();
}

@Test
public void testOrphanNodeCleanupWhenRepairSkipped()
{
// Insert 3 auto-repair records for each live node and 1 for the orphan node
List<UUID> liveHostIds = new ArrayList<>();
for (int i = 1; i <= 3; i++)
{
liveHostIds.add(
cluster.get(i).callOnInstance(() ->
StorageService.instance.getHostIdForEndpoint(
FBUtilities.getBroadcastAddressAndPort())));
}
UUID orphanHostId = UUID.randomUUID();

long currentTime = System.currentTimeMillis();
// Orphan node: oldest finish time, so it is next in line to run repair
long orphanStart = currentTime - TimeUnit.HOURS.toMillis(4); // 4 hours ago
long orphanFinish = currentTime - TimeUnit.HOURS.toMillis(3); // 3 hours ago

// Live nodes: more recent finish times
long[] liveStart = {
currentTime - TimeUnit.HOURS.toMillis(3), // 3 hours ago
currentTime - TimeUnit.HOURS.toMillis(2), // 2 hours ago
currentTime - TimeUnit.HOURS.toMillis(1) // 1 hour ago
};
long[] liveFinish = {
currentTime - TimeUnit.HOURS.toMillis(2), // 2 hours ago
currentTime - TimeUnit.HOURS.toMillis(1), // 1 hour ago
currentTime // now
};

// Insert live node records
for (int i = 0; i < 3; i++)
{
cluster.coordinator(1).execute(String.format(
"INSERT INTO %s.%s (repair_type, host_id, repair_start_ts, repair_finish_ts, repair_turn) " +
"VALUES ('%s', %s, %d, %d, '%s')",
DISTRIBUTED_KEYSPACE_NAME,
AUTO_REPAIR_HISTORY,
AutoRepairConfig.RepairType.FULL,
liveHostIds.get(i),
liveStart[i],
liveFinish[i],
"NOT_MY_TURN"
), ConsistencyLevel.QUORUM);
}

// Insert orphan node record (should be next in line to run repair)
cluster.coordinator(1).execute(String.format(
"INSERT INTO %s.%s (repair_type, host_id, repair_start_ts, repair_finish_ts, repair_turn) " +
"VALUES ('%s', %s, %d, %d, '%s')",
DISTRIBUTED_KEYSPACE_NAME,
AUTO_REPAIR_HISTORY,
AutoRepairConfig.RepairType.FULL,
orphanHostId,
orphanStart,
orphanFinish,
"NOT_MY_TURN"
), ConsistencyLevel.QUORUM);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should validate the orphan node and the three live nodes entries by doing a SELECT query before starting the auto repair.

// Once the auto_repair_history table is prepared, initialize auto-repair service on all nodes
cluster.forEach(i -> i.runOnInstance(() -> {
try
{
AutoRepairService.setup();
AutoRepair.instance.setup();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}));

// Wait for at least one auto-repair cycle to allow orphan cleanup to run
// (auto_repair.repair_check_interval is set to 1s, so 10s is plenty)
Util.spinAssertEquals(true, () -> {
Object[][] rows = cluster.coordinator(1).execute(
String.format("SELECT host_id FROM %s.%s WHERE repair_type = '%s'",
DISTRIBUTED_KEYSPACE_NAME,
AUTO_REPAIR_HISTORY,
AutoRepairConfig.RepairType.FULL),
ConsistencyLevel.QUORUM
);
// The orphanHostId should not be present in the results
for (Object[] row : rows)
{
UUID hostId = (UUID) row[0];
if (hostId.equals(orphanHostId))
return false;
}
return true;
}, 10_000);

// Query all records for this repair type
Object[][] rows = cluster.coordinator(1).execute(
String.format("SELECT host_id, repair_start_ts, repair_finish_ts FROM %s.%s WHERE repair_type = '%s'",
DISTRIBUTED_KEYSPACE_NAME,
AUTO_REPAIR_HISTORY,
AutoRepairConfig.RepairType.FULL),
ConsistencyLevel.QUORUM
);

// Check that the orphan node's record is gone, and live nodes' records remain with unchanged timestamps
Map<UUID, long[]> actualLiveTimestamps = new HashMap<>();
for (Object[] row : rows)
{
UUID hostId = (UUID) row[0];
long startTs = ((Date) row[1]).getTime();
long finishTs = ((Date) row[2]).getTime();
if (hostId.equals(orphanHostId))
{
throw new AssertionError("Orphan node record was not cleaned up");
}
actualLiveTimestamps.put(hostId, new long[]{startTs, finishTs});
}

// All live nodes should still be present with their original timestamps
assertEquals("Unexpected number of live node records", 3, actualLiveTimestamps.size());
for (int i = 0; i < 3; i++)
{
UUID hostId = liveHostIds.get(i);
long[] expected = new long[]{liveStart[i], liveFinish[i]};
long[] actual = actualLiveTimestamps.get(hostId);
if (actual == null)
throw new AssertionError("Live node record missing for hostId: " + hostId);
assertEquals("Live node repair_start_ts changed for hostId: " + hostId, expected[0], actual[0]);
assertEquals("Live node repair_finish_ts changed for hostId: " + hostId, expected[1], actual[1]);
}
}
}