Skip to content

Commit ed64f94

Browse files
committed
KAFKA-19858: Set default min.insync.replicas=2 for __remote_log_metadata topic to prevent data loss
1 parent 8900a18 commit ed64f94

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ private NewTopic newRemoteLogMetadataTopic(TopicBasedRemoteLogMetadataManagerCon
434434
topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(rlmmConfig.metadataTopicRetentionMs()));
435435
topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
436436
topicConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false");
437+
topicConfigs.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Short.toString(rlmmConfig.metadataTopicMinIsr()));
437438
return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(),
438439
rlmmConfig.metadataTopicPartitionsCount(),
439440
rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs);

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
4646
public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = "remote.log.metadata.topic.replication.factor";
4747
public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = "remote.log.metadata.topic.num.partitions";
4848
public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP = "remote.log.metadata.topic.retention.ms";
49+
public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP = "remote.log.metadata.topic.min.isr";
4950
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = "remote.log.metadata.consume.wait.ms";
5051
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP = "remote.log.metadata.initialization.retry.max.timeout.ms";
5152
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP = "remote.log.metadata.initialization.retry.interval.ms";
5253

5354
public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
5455
public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS = -1L;
5556
public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
57+
public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR = 2;
5658
public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 2 * 60 * 1000L;
5759
public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS = 2 * 60 * 1000L;
5860
public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS = 100L;
@@ -63,6 +65,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
6365
"Default: -1, that means unlimited. Users can configure this value based on their use cases. " +
6466
"To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with " +
6567
"tiered storage in the cluster.";
68+
public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC = "The minimum number of replicas that must acknowledge a write to remote log metadata topic.";
6669
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The amount of time in milliseconds to wait for the local consumer to " +
6770
"receive the published event.";
6871
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC = "The retry interval in milliseconds for " +
@@ -90,6 +93,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
9093
REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC)
9194
.define(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS, LOW,
9295
REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC)
96+
.define(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP, SHORT, DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR, atLeast(1), LOW,
97+
REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC)
9398
.define(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS, atLeast(0), LOW,
9499
REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC)
95100
.define(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP, LONG,
@@ -106,6 +111,7 @@ DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS, atLeast(0), LOW,
106111
private final long consumeWaitMs;
107112
private final long metadataTopicRetentionMs;
108113
private final short metadataTopicReplicationFactor;
114+
private final short metadataTopicMinIsr;
109115
private final long initializationRetryMaxTimeoutMs;
110116
private final long initializationRetryIntervalMs;
111117

@@ -126,6 +132,7 @@ public TopicBasedRemoteLogMetadataManagerConfig(Map<String, ?> props) {
126132
if (metadataTopicRetentionMs != -1 && metadataTopicRetentionMs <= 0) {
127133
throw new IllegalArgumentException("Invalid metadata topic retention in millis: " + metadataTopicRetentionMs);
128134
}
135+
metadataTopicMinIsr = (short) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP);
129136
consumeWaitMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP);
130137
initializationRetryIntervalMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP);
131138
initializationRetryMaxTimeoutMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP);
@@ -184,6 +191,10 @@ public long initializationRetryIntervalMs() {
184191
return initializationRetryIntervalMs;
185192
}
186193

194+
public short metadataTopicMinIsr() {
195+
return metadataTopicMinIsr;
196+
}
197+
187198
public String logDir() {
188199
return logDir;
189200
}

0 commit comments

Comments
 (0)