diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index 1b2b459cb8d34..cd2ed674216c2 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -434,6 +434,7 @@ private NewTopic newRemoteLogMetadataTopic(TopicBasedRemoteLogMetadataManagerCon topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(rlmmConfig.metadataTopicRetentionMs())); topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); topicConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"); + topicConfigs.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Short.toString(rlmmConfig.metadataTopicMinIsr())); return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(), rlmmConfig.metadataTopicPartitionsCount(), rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs); diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java index 721d58e34d86f..48ff4055affd0 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java @@ -46,6 +46,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig { public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = "remote.log.metadata.topic.replication.factor"; public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = "remote.log.metadata.topic.num.partitions"; public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP = "remote.log.metadata.topic.retention.ms"; + public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP = "remote.log.metadata.topic.min.isr"; public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = "remote.log.metadata.consume.wait.ms"; public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP = "remote.log.metadata.initialization.retry.max.timeout.ms"; public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP = "remote.log.metadata.initialization.retry.interval.ms"; @@ -53,6 +54,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig { public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50; public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS = -1L; public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3; + public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR = 2; public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 2 * 60 * 1000L; public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS = 2 * 60 * 1000L; public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS = 100L; @@ -63,6 +65,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig { "Default: -1, that means unlimited. Users can configure this value based on their use cases. " + "To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with " + "tiered storage in the cluster."; + public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC = "The minimum number of replicas that must acknowledge a write to remote log metadata topic."; public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The amount of time in milliseconds to wait for the local consumer to " + "receive the published event."; public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC = "The retry interval in milliseconds for " + @@ -90,6 +93,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig { REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC) .define(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS, LOW, REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC) + .define(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP, SHORT, DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR, atLeast(1), LOW, + REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC) .define(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS, atLeast(0), LOW, REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC) .define(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP, LONG, @@ -106,6 +111,7 @@ DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS, atLeast(0), LOW, private final long consumeWaitMs; private final long metadataTopicRetentionMs; private final short metadataTopicReplicationFactor; + private final short metadataTopicMinIsr; private final long initializationRetryMaxTimeoutMs; private final long initializationRetryIntervalMs; @@ -126,6 +132,7 @@ public TopicBasedRemoteLogMetadataManagerConfig(Map props) { if (metadataTopicRetentionMs != -1 && metadataTopicRetentionMs <= 0) { throw new IllegalArgumentException("Invalid metadata topic retention in millis: " + metadataTopicRetentionMs); } + metadataTopicMinIsr = (short) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP); consumeWaitMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP); initializationRetryIntervalMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP); initializationRetryMaxTimeoutMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP); @@ -184,6 +191,10 @@ public long initializationRetryIntervalMs() { return initializationRetryIntervalMs; } + public short metadataTopicMinIsr() { + return metadataTopicMinIsr; + } + public String logDir() { return logDir; }